authentik.providers.oauth2.tests.test_authorize
Test authorize view
1"""Test authorize view""" 2 3from unittest.mock import MagicMock, patch 4from urllib.parse import parse_qs, urlparse 5 6from django.test import RequestFactory 7from django.urls import reverse 8from django.utils import translation 9from django.utils.timezone import now 10 11from authentik.blueprints.tests import apply_blueprint 12from authentik.common.oauth.constants import SCOPE_OFFLINE_ACCESS, SCOPE_OPENID, TOKEN_TYPE 13from authentik.core.models import Application 14from authentik.core.tests.utils import create_test_admin_user, create_test_brand, create_test_flow 15from authentik.events.models import Event, EventAction 16from authentik.flows.models import FlowStageBinding 17from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER 18from authentik.flows.views.executor import SESSION_KEY_PLAN 19from authentik.lib.generators import generate_id 20from authentik.lib.utils.time import timedelta_from_string 21from authentik.providers.oauth2.errors import AuthorizeError, ClientIdError, RedirectUriError 22from authentik.providers.oauth2.models import ( 23 AccessToken, 24 AuthorizationCode, 25 GrantTypes, 26 OAuth2Provider, 27 RedirectURI, 28 RedirectURIMatchingMode, 29 ScopeMapping, 30) 31from authentik.providers.oauth2.tests.utils import OAuthTestCase 32from authentik.providers.oauth2.views.authorize import OAuthAuthorizationParams 33from authentik.stages.dummy.models import DummyStage 34from authentik.stages.password.stage import PLAN_CONTEXT_METHOD 35 36 37class TestAuthorize(OAuthTestCase): 38 """Test authorize view""" 39 40 def setUp(self) -> None: 41 super().setUp() 42 self.factory = RequestFactory() 43 44 def test_invalid_grant_type(self): 45 """Test with invalid grant type""" 46 OAuth2Provider.objects.create( 47 name=generate_id(), 48 client_id="test", 49 authorization_flow=create_test_flow(), 50 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 51 ) 52 with self.assertRaises(AuthorizeError) as cm: 53 request = self.factory.get( 54 "/", 55 data={ 56 "response_type": "invalid", 57 "client_id": "test", 58 "redirect_uri": "http://local.invalid/Foo", 59 }, 60 ) 61 OAuthAuthorizationParams.from_request(request) 62 self.assertEqual(cm.exception.error, "unsupported_response_type") 63 64 def test_invalid_client_id(self): 65 """Test invalid client ID""" 66 with self.assertRaises(ClientIdError): 67 request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"}) 68 OAuthAuthorizationParams.from_request(request) 69 70 def test_request(self): 71 """test request param""" 72 OAuth2Provider.objects.create( 73 name=generate_id(), 74 client_id="test", 75 authorization_flow=create_test_flow(), 76 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 77 ) 78 with self.assertRaises(AuthorizeError) as cm: 79 request = self.factory.get( 80 "/", 81 data={ 82 "response_type": "code", 83 "client_id": "test", 84 "redirect_uri": "http://local.invalid/Foo", 85 "request": "foo", 86 }, 87 ) 88 OAuthAuthorizationParams.from_request(request) 89 self.assertEqual(cm.exception.error, "request_not_supported") 90 91 def test_invalid_redirect_uri_missing(self): 92 """test missing redirect URI""" 93 OAuth2Provider.objects.create( 94 name=generate_id(), 95 client_id="test", 96 authorization_flow=create_test_flow(), 97 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 98 ) 99 with self.assertRaises(RedirectUriError) as cm: 100 request = self.factory.get("/", data={"response_type": "code", "client_id": "test"}) 101 OAuthAuthorizationParams.from_request(request) 102 self.assertEqual(cm.exception.cause, "redirect_uri_missing") 103 104 def test_invalid_redirect_uri(self): 105 """test invalid redirect URI""" 106 OAuth2Provider.objects.create( 107 name=generate_id(), 108 client_id="test", 109 authorization_flow=create_test_flow(), 110 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 111 ) 112 with self.assertRaises(RedirectUriError) as cm: 113 request = self.factory.get( 114 "/", 115 data={ 116 "response_type": "code", 117 "client_id": "test", 118 "redirect_uri": "http://localhost", 119 }, 120 ) 121 OAuthAuthorizationParams.from_request(request) 122 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 123 124 def test_blocked_redirect_uri(self): 125 """test missing/invalid redirect URI""" 126 OAuth2Provider.objects.create( 127 name=generate_id(), 128 client_id="test", 129 authorization_flow=create_test_flow(), 130 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")], 131 ) 132 with self.assertRaises(RedirectUriError) as cm: 133 request = self.factory.get( 134 "/", 135 data={ 136 "response_type": "code", 137 "client_id": "test", 138 "redirect_uri": "data:localhost", 139 }, 140 ) 141 OAuthAuthorizationParams.from_request(request) 142 self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme") 143 144 def test_invalid_redirect_uri_empty(self): 145 """test missing/invalid redirect URI""" 146 provider = OAuth2Provider.objects.create( 147 name=generate_id(), 148 client_id="test", 149 authorization_flow=create_test_flow(), 150 redirect_uris=[], 151 ) 152 request = self.factory.get( 153 "/", 154 data={ 155 "response_type": "code", 156 "client_id": "test", 157 "redirect_uri": "+", 158 }, 159 ) 160 OAuthAuthorizationParams.from_request(request) 161 provider.refresh_from_db() 162 self.assertEqual(provider.redirect_uris, [RedirectURI(RedirectURIMatchingMode.STRICT, "+")]) 163 164 def test_invalid_redirect_uri_regex(self): 165 """test missing/invalid redirect URI""" 166 OAuth2Provider.objects.create( 167 name=generate_id(), 168 client_id="test", 169 authorization_flow=create_test_flow(), 170 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")], 171 ) 172 with self.assertRaises(RedirectUriError) as cm: 173 request = self.factory.get( 174 "/", 175 data={ 176 "response_type": "code", 177 "client_id": "test", 178 "redirect_uri": "http://localhost", 179 }, 180 ) 181 OAuthAuthorizationParams.from_request(request) 182 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 183 184 def test_redirect_uri_invalid_regex(self): 185 """test missing/invalid redirect URI (invalid regex)""" 186 OAuth2Provider.objects.create( 187 name=generate_id(), 188 client_id="test", 189 authorization_flow=create_test_flow(), 190 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")], 191 ) 192 with self.assertRaises(RedirectUriError) as cm: 193 request = self.factory.get( 194 "/", 195 data={ 196 "response_type": "code", 197 "client_id": "test", 198 "redirect_uri": "http://localhost", 199 }, 200 ) 201 OAuthAuthorizationParams.from_request(request) 202 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 203 204 def test_redirect_uri_regex(self): 205 """test valid redirect URI (regex)""" 206 OAuth2Provider.objects.create( 207 name=generate_id(), 208 client_id="test", 209 authorization_flow=create_test_flow(), 210 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")], 211 ) 212 request = self.factory.get( 213 "/", 214 data={ 215 "response_type": "code", 216 "client_id": "test", 217 "redirect_uri": "http://foo.bar.baz", 218 }, 219 ) 220 OAuthAuthorizationParams.from_request(request) 221 222 @apply_blueprint("system/providers-oauth2.yaml") 223 def test_response_type(self): 224 """test response_type""" 225 provider = OAuth2Provider.objects.create( 226 name=generate_id(), 227 client_id="test", 228 authorization_flow=create_test_flow(), 229 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 230 ) 231 provider.property_mappings.set( 232 ScopeMapping.objects.filter( 233 managed__in=[ 234 "goauthentik.io/providers/oauth2/scope-openid", 235 "goauthentik.io/providers/oauth2/scope-email", 236 "goauthentik.io/providers/oauth2/scope-profile", 237 ] 238 ) 239 ) 240 request = self.factory.get( 241 "/", 242 data={ 243 "response_type": "code", 244 "client_id": "test", 245 "redirect_uri": "http://local.invalid/Foo", 246 }, 247 ) 248 self.assertEqual( 249 OAuthAuthorizationParams.from_request(request).grant_type, 250 GrantTypes.AUTHORIZATION_CODE, 251 ) 252 self.assertEqual( 253 OAuthAuthorizationParams.from_request(request).redirect_uri, 254 "http://local.invalid/Foo", 255 ) 256 request = self.factory.get( 257 "/", 258 data={ 259 "response_type": "id_token", 260 "client_id": "test", 261 "redirect_uri": "http://local.invalid/Foo", 262 "scope": "openid", 263 "state": "foo", 264 "nonce": generate_id(), 265 }, 266 ) 267 self.assertEqual( 268 OAuthAuthorizationParams.from_request(request).grant_type, 269 GrantTypes.IMPLICIT, 270 ) 271 # Implicit without openid scope 272 with self.assertRaises(AuthorizeError) as cm: 273 request = self.factory.get( 274 "/", 275 data={ 276 "response_type": "id_token", 277 "client_id": "test", 278 "redirect_uri": "http://local.invalid/Foo", 279 "state": "foo", 280 }, 281 ) 282 self.assertEqual( 283 OAuthAuthorizationParams.from_request(request).grant_type, 284 GrantTypes.IMPLICIT, 285 ) 286 request = self.factory.get( 287 "/", 288 data={ 289 "response_type": "code token", 290 "client_id": "test", 291 "redirect_uri": "http://local.invalid/Foo", 292 "scope": "openid", 293 "state": "foo", 294 }, 295 ) 296 self.assertEqual( 297 OAuthAuthorizationParams.from_request(request).grant_type, GrantTypes.HYBRID 298 ) 299 with self.assertRaises(AuthorizeError) as cm: 300 request = self.factory.get( 301 "/", 302 data={ 303 "response_type": "invalid", 304 "client_id": "test", 305 "redirect_uri": "http://local.invalid/Foo", 306 }, 307 ) 308 OAuthAuthorizationParams.from_request(request) 309 self.assertEqual(cm.exception.error, "unsupported_response_type") 310 311 def test_full_code(self): 312 """Test full authorization""" 313 flow = create_test_flow() 314 provider = OAuth2Provider.objects.create( 315 name=generate_id(), 316 client_id="test", 317 authorization_flow=flow, 318 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 319 access_code_validity="seconds=100", 320 ) 321 Application.objects.create(name="app", slug="app", provider=provider) 322 state = generate_id() 323 user = create_test_admin_user() 324 self.client.force_login(user) 325 # Step 1, initiate params and get redirect to flow 326 response = self.client.get( 327 reverse("authentik_providers_oauth2:authorize"), 328 data={ 329 "response_type": "code", 330 "client_id": "test", 331 "state": state, 332 "redirect_uri": "foo://localhost", 333 }, 334 ) 335 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 336 self.assertEqual( 337 response.url, 338 f"foo://localhost?code={code.code}&state={state}", 339 ) 340 self.assertAlmostEqual( 341 code.expires.timestamp() - now().timestamp(), 342 timedelta_from_string(provider.access_code_validity).total_seconds(), 343 delta=5, 344 ) 345 346 @apply_blueprint("system/providers-oauth2.yaml") 347 def test_full_implicit(self): 348 """Test full authorization""" 349 flow = create_test_flow() 350 provider: OAuth2Provider = OAuth2Provider.objects.create( 351 name=generate_id(), 352 client_id="test", 353 authorization_flow=flow, 354 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 355 signing_key=self.keypair, 356 ) 357 provider.property_mappings.set( 358 ScopeMapping.objects.filter( 359 managed__in=[ 360 "goauthentik.io/providers/oauth2/scope-openid", 361 "goauthentik.io/providers/oauth2/scope-email", 362 "goauthentik.io/providers/oauth2/scope-profile", 363 ] 364 ) 365 ) 366 provider.property_mappings.add( 367 ScopeMapping.objects.create( 368 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 369 ) 370 ) 371 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 372 state = generate_id() 373 user = create_test_admin_user() 374 self.client.force_login(user) 375 with patch( 376 "authentik.providers.oauth2.id_token.get_login_event", 377 MagicMock( 378 return_value=Event( 379 action=EventAction.LOGIN, 380 context={PLAN_CONTEXT_METHOD: "password"}, 381 created=now(), 382 ) 383 ), 384 ): 385 # Step 1, initiate params and get redirect to flow 386 response = self.client.get( 387 reverse("authentik_providers_oauth2:authorize"), 388 data={ 389 "response_type": "id_token", 390 "client_id": "test", 391 "state": state, 392 "scope": "openid test", 393 "redirect_uri": "http://localhost", 394 "nonce": generate_id(), 395 }, 396 ) 397 token: AccessToken = AccessToken.objects.filter(user=user).first() 398 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 399 self.assertEqual( 400 response.url, 401 ( 402 f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}" 403 f"&token_type={TOKEN_TYPE}" 404 f"&expires_in={int(expires)}&state={state}" 405 ), 406 ) 407 jwt = self.validate_jwt(token, provider) 408 self.assertEqual(jwt["amr"], ["pwd"]) 409 self.assertEqual(jwt["sub"], "foo") 410 self.assertAlmostEqual( 411 jwt["exp"] - now().timestamp(), 412 expires, 413 delta=5, 414 ) 415 416 @apply_blueprint("system/providers-oauth2.yaml") 417 def test_full_implicit_enc(self): 418 """Test full authorization with encryption""" 419 flow = create_test_flow() 420 provider: OAuth2Provider = OAuth2Provider.objects.create( 421 name=generate_id(), 422 client_id="test", 423 authorization_flow=flow, 424 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 425 signing_key=self.keypair, 426 encryption_key=self.keypair, 427 ) 428 provider.property_mappings.set( 429 ScopeMapping.objects.filter( 430 managed__in=[ 431 "goauthentik.io/providers/oauth2/scope-openid", 432 "goauthentik.io/providers/oauth2/scope-email", 433 "goauthentik.io/providers/oauth2/scope-profile", 434 ] 435 ) 436 ) 437 provider.property_mappings.add( 438 ScopeMapping.objects.create( 439 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 440 ) 441 ) 442 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 443 state = generate_id() 444 user = create_test_admin_user() 445 self.client.force_login(user) 446 with patch( 447 "authentik.providers.oauth2.id_token.get_login_event", 448 MagicMock( 449 return_value=Event( 450 action=EventAction.LOGIN, 451 context={PLAN_CONTEXT_METHOD: "password"}, 452 created=now(), 453 ) 454 ), 455 ): 456 # Step 1, initiate params and get redirect to flow 457 response = self.client.get( 458 reverse("authentik_providers_oauth2:authorize"), 459 data={ 460 "response_type": "id_token", 461 "client_id": "test", 462 "state": state, 463 "scope": "openid test", 464 "redirect_uri": "http://localhost", 465 "nonce": generate_id(), 466 }, 467 ) 468 self.assertEqual(response.status_code, 302) 469 token: AccessToken = AccessToken.objects.filter(user=user).first() 470 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 471 jwt = self.validate_jwe(token, provider) 472 self.assertEqual(jwt["amr"], ["pwd"]) 473 self.assertEqual(jwt["sub"], "foo") 474 self.assertAlmostEqual( 475 jwt["exp"] - now().timestamp(), 476 expires, 477 delta=5, 478 ) 479 480 def test_full_fragment_code(self): 481 """Test full authorization""" 482 flow = create_test_flow() 483 provider: OAuth2Provider = OAuth2Provider.objects.create( 484 name=generate_id(), 485 client_id="test", 486 authorization_flow=flow, 487 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 488 signing_key=self.keypair, 489 ) 490 Application.objects.create(name="app", slug="app", provider=provider) 491 state = generate_id() 492 user = create_test_admin_user() 493 self.client.force_login(user) 494 with patch( 495 "authentik.providers.oauth2.id_token.get_login_event", 496 MagicMock( 497 return_value=Event( 498 action=EventAction.LOGIN, 499 context={PLAN_CONTEXT_METHOD: "password"}, 500 created=now(), 501 ) 502 ), 503 ): 504 # Step 1, initiate params and get redirect to flow 505 response = self.client.get( 506 reverse("authentik_providers_oauth2:authorize"), 507 data={ 508 "response_type": "code", 509 "response_mode": "fragment", 510 "client_id": "test", 511 "state": state, 512 "scope": "openid", 513 "redirect_uri": "http://localhost", 514 "nonce": generate_id(), 515 }, 516 ) 517 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 518 self.assertEqual( 519 response.url, 520 f"http://localhost#code={code.code}&state={state}", 521 ) 522 self.assertAlmostEqual( 523 code.expires.timestamp() - now().timestamp(), 524 timedelta_from_string(provider.access_code_validity).total_seconds(), 525 delta=5, 526 ) 527 528 @apply_blueprint("system/providers-oauth2.yaml") 529 def test_full_form_post_id_token(self): 530 """Test full authorization (form_post response)""" 531 flow = create_test_flow() 532 provider = OAuth2Provider.objects.create( 533 name=generate_id(), 534 client_id=generate_id(), 535 authorization_flow=flow, 536 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 537 signing_key=self.keypair, 538 ) 539 provider.property_mappings.set( 540 ScopeMapping.objects.filter( 541 managed__in=[ 542 "goauthentik.io/providers/oauth2/scope-openid", 543 "goauthentik.io/providers/oauth2/scope-email", 544 "goauthentik.io/providers/oauth2/scope-profile", 545 ] 546 ) 547 ) 548 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 549 state = generate_id() 550 user = create_test_admin_user() 551 self.client.force_login(user) 552 # Step 1, initiate params and get redirect to flow 553 self.client.get( 554 reverse("authentik_providers_oauth2:authorize"), 555 data={ 556 "response_type": "id_token", 557 "response_mode": "form_post", 558 "client_id": provider.client_id, 559 "state": state, 560 "scope": "openid", 561 "redirect_uri": "http://localhost", 562 "nonce": generate_id(), 563 }, 564 ) 565 response = self.client.get( 566 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 567 ) 568 token: AccessToken = AccessToken.objects.filter(user=user).first() 569 self.assertIsNotNone(token) 570 self.assertJSONEqual( 571 response.content.decode(), 572 { 573 "component": "ak-stage-autosubmit", 574 "url": "http://localhost", 575 "title": f"Redirecting to {app.name}...", 576 "attrs": { 577 "id_token": provider.encode(token.id_token.to_dict()), 578 "token_type": TOKEN_TYPE, 579 "expires_in": "3600", 580 "state": state, 581 }, 582 }, 583 ) 584 self.validate_jwt(token, provider) 585 586 def test_full_form_post_code(self): 587 """Test full authorization (form_post response, code type)""" 588 flow = create_test_flow() 589 provider = OAuth2Provider.objects.create( 590 name=generate_id(), 591 client_id=generate_id(), 592 authorization_flow=flow, 593 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 594 signing_key=self.keypair, 595 ) 596 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 597 state = generate_id() 598 user = create_test_admin_user() 599 self.client.force_login(user) 600 # Step 1, initiate params and get redirect to flow 601 self.client.get( 602 reverse("authentik_providers_oauth2:authorize"), 603 data={ 604 "response_type": "code", 605 "response_mode": "form_post", 606 "client_id": provider.client_id, 607 "state": state, 608 "scope": "openid", 609 "redirect_uri": "http://localhost", 610 }, 611 ) 612 response = self.client.get( 613 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 614 ) 615 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 616 self.assertJSONEqual( 617 response.content.decode(), 618 { 619 "component": "ak-stage-autosubmit", 620 "url": "http://localhost", 621 "title": f"Redirecting to {app.name}...", 622 "attrs": { 623 "code": code.code, 624 "state": state, 625 }, 626 }, 627 ) 628 629 def test_openid_missing_invalid(self): 630 """test request requiring an OpenID scope to be set""" 631 OAuth2Provider.objects.create( 632 name=generate_id(), 633 client_id="test", 634 authorization_flow=create_test_flow(), 635 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 636 ) 637 request = self.factory.get( 638 "/", 639 data={ 640 "response_type": "id_token", 641 "client_id": "test", 642 "redirect_uri": "http://localhost", 643 "scope": "", 644 }, 645 ) 646 with self.assertRaises(AuthorizeError) as cm: 647 OAuthAuthorizationParams.from_request(request) 648 self.assertEqual(cm.exception.cause, "scope_openid_missing") 649 650 @apply_blueprint("system/providers-oauth2.yaml") 651 def test_offline_access_invalid(self): 652 """test request for offline_access with invalid response type""" 653 provider = OAuth2Provider.objects.create( 654 name=generate_id(), 655 client_id="test", 656 authorization_flow=create_test_flow(), 657 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 658 ) 659 provider.property_mappings.set( 660 ScopeMapping.objects.filter( 661 managed__in=[ 662 "goauthentik.io/providers/oauth2/scope-openid", 663 "goauthentik.io/providers/oauth2/scope-offline_access", 664 ] 665 ) 666 ) 667 request = self.factory.get( 668 "/", 669 data={ 670 "response_type": "id_token", 671 "client_id": "test", 672 "redirect_uri": "http://localhost", 673 "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}", 674 "nonce": generate_id(), 675 }, 676 ) 677 parsed = OAuthAuthorizationParams.from_request(request) 678 self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope) 679 680 @apply_blueprint("default/flow-default-authentication-flow.yaml") 681 def test_ui_locales(self): 682 """Test OIDC ui_locales authorization""" 683 flow = create_test_flow() 684 provider = OAuth2Provider.objects.create( 685 name=generate_id(), 686 client_id="test", 687 authorization_flow=flow, 688 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 689 access_code_validity="seconds=100", 690 ) 691 Application.objects.create(name="app", slug="app", provider=provider) 692 state = generate_id() 693 self.client.logout() 694 try: 695 response = self.client.get( 696 reverse("authentik_providers_oauth2:authorize"), 697 data={ 698 "response_type": "code", 699 "client_id": "test", 700 "state": state, 701 "redirect_uri": "foo://localhost", 702 "ui_locales": "invalid fr", 703 }, 704 ) 705 parsed = parse_qs(urlparse(response.url).query) 706 self.assertEqual(parsed["locale"], ["fr"]) 707 finally: 708 translation.deactivate() 709 710 @apply_blueprint("default/flow-default-authentication-flow.yaml") 711 def test_ui_locales_invalid(self): 712 """Test OIDC ui_locales authorization""" 713 flow = create_test_flow() 714 provider = OAuth2Provider.objects.create( 715 name=generate_id(), 716 client_id="test", 717 authorization_flow=flow, 718 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 719 access_code_validity="seconds=100", 720 ) 721 Application.objects.create(name="app", slug="app", provider=provider) 722 state = generate_id() 723 self.client.logout() 724 response = self.client.get( 725 reverse("authentik_providers_oauth2:authorize"), 726 data={ 727 "response_type": "code", 728 "client_id": "test", 729 "state": state, 730 "redirect_uri": "foo://localhost", 731 "ui_locales": "invalid", 732 }, 733 ) 734 parsed = parse_qs(urlparse(response.url).query) 735 self.assertNotIn("locale", parsed) 736 737 def test_authentication_flow(self): 738 """Test custom authentication flow""" 739 brand = create_test_brand() 740 global_auth = create_test_flow() 741 FlowStageBinding.objects.create( 742 target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10 743 ) 744 brand.flow_authentication = global_auth 745 brand.save() 746 747 flow = create_test_flow() 748 auth_flow = create_test_flow() 749 FlowStageBinding.objects.create( 750 target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10 751 ) 752 provider = OAuth2Provider.objects.create( 753 name=generate_id(), 754 client_id="test", 755 authorization_flow=flow, 756 authentication_flow=auth_flow, 757 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 758 access_code_validity="seconds=100", 759 ) 760 Application.objects.create(name="app", slug="app", provider=provider) 761 state = generate_id() 762 response = self.client.get( 763 reverse("authentik_providers_oauth2:authorize"), 764 data={ 765 "response_type": "code", 766 "client_id": "test", 767 "state": state, 768 "redirect_uri": "foo://localhost", 769 }, 770 ) 771 self.assertEqual(response.status_code, 302) 772 self.assertIn(auth_flow.slug, response.url) 773 self.assertNotIn(global_auth.slug, response.url) 774 775 @apply_blueprint("default/flow-default-authentication-flow.yaml") 776 def test_login_hint(self): 777 """Login hint""" 778 flow = create_test_flow() 779 provider = OAuth2Provider.objects.create( 780 name=generate_id(), 781 client_id="test", 782 authorization_flow=flow, 783 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 784 access_code_validity="seconds=100", 785 ) 786 Application.objects.create(name="app", slug="app", provider=provider) 787 state = generate_id() 788 response = self.client.get( 789 reverse("authentik_providers_oauth2:authorize"), 790 data={ 791 "response_type": "code", 792 "client_id": "test", 793 "state": state, 794 "redirect_uri": "foo://localhost", 795 "login_hint": "foo", 796 }, 797 ) 798 self.assertEqual(response.status_code, 302) 799 plan = self.client.session.get(SESSION_KEY_PLAN) 800 self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")
38class TestAuthorize(OAuthTestCase): 39 """Test authorize view""" 40 41 def setUp(self) -> None: 42 super().setUp() 43 self.factory = RequestFactory() 44 45 def test_invalid_grant_type(self): 46 """Test with invalid grant type""" 47 OAuth2Provider.objects.create( 48 name=generate_id(), 49 client_id="test", 50 authorization_flow=create_test_flow(), 51 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 52 ) 53 with self.assertRaises(AuthorizeError) as cm: 54 request = self.factory.get( 55 "/", 56 data={ 57 "response_type": "invalid", 58 "client_id": "test", 59 "redirect_uri": "http://local.invalid/Foo", 60 }, 61 ) 62 OAuthAuthorizationParams.from_request(request) 63 self.assertEqual(cm.exception.error, "unsupported_response_type") 64 65 def test_invalid_client_id(self): 66 """Test invalid client ID""" 67 with self.assertRaises(ClientIdError): 68 request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"}) 69 OAuthAuthorizationParams.from_request(request) 70 71 def test_request(self): 72 """test request param""" 73 OAuth2Provider.objects.create( 74 name=generate_id(), 75 client_id="test", 76 authorization_flow=create_test_flow(), 77 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 78 ) 79 with self.assertRaises(AuthorizeError) as cm: 80 request = self.factory.get( 81 "/", 82 data={ 83 "response_type": "code", 84 "client_id": "test", 85 "redirect_uri": "http://local.invalid/Foo", 86 "request": "foo", 87 }, 88 ) 89 OAuthAuthorizationParams.from_request(request) 90 self.assertEqual(cm.exception.error, "request_not_supported") 91 92 def test_invalid_redirect_uri_missing(self): 93 """test missing redirect URI""" 94 OAuth2Provider.objects.create( 95 name=generate_id(), 96 client_id="test", 97 authorization_flow=create_test_flow(), 98 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 99 ) 100 with self.assertRaises(RedirectUriError) as cm: 101 request = self.factory.get("/", data={"response_type": "code", "client_id": "test"}) 102 OAuthAuthorizationParams.from_request(request) 103 self.assertEqual(cm.exception.cause, "redirect_uri_missing") 104 105 def test_invalid_redirect_uri(self): 106 """test invalid redirect URI""" 107 OAuth2Provider.objects.create( 108 name=generate_id(), 109 client_id="test", 110 authorization_flow=create_test_flow(), 111 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 112 ) 113 with self.assertRaises(RedirectUriError) as cm: 114 request = self.factory.get( 115 "/", 116 data={ 117 "response_type": "code", 118 "client_id": "test", 119 "redirect_uri": "http://localhost", 120 }, 121 ) 122 OAuthAuthorizationParams.from_request(request) 123 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 124 125 def test_blocked_redirect_uri(self): 126 """test missing/invalid redirect URI""" 127 OAuth2Provider.objects.create( 128 name=generate_id(), 129 client_id="test", 130 authorization_flow=create_test_flow(), 131 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")], 132 ) 133 with self.assertRaises(RedirectUriError) as cm: 134 request = self.factory.get( 135 "/", 136 data={ 137 "response_type": "code", 138 "client_id": "test", 139 "redirect_uri": "data:localhost", 140 }, 141 ) 142 OAuthAuthorizationParams.from_request(request) 143 self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme") 144 145 def test_invalid_redirect_uri_empty(self): 146 """test missing/invalid redirect URI""" 147 provider = OAuth2Provider.objects.create( 148 name=generate_id(), 149 client_id="test", 150 authorization_flow=create_test_flow(), 151 redirect_uris=[], 152 ) 153 request = self.factory.get( 154 "/", 155 data={ 156 "response_type": "code", 157 "client_id": "test", 158 "redirect_uri": "+", 159 }, 160 ) 161 OAuthAuthorizationParams.from_request(request) 162 provider.refresh_from_db() 163 self.assertEqual(provider.redirect_uris, [RedirectURI(RedirectURIMatchingMode.STRICT, "+")]) 164 165 def test_invalid_redirect_uri_regex(self): 166 """test missing/invalid redirect URI""" 167 OAuth2Provider.objects.create( 168 name=generate_id(), 169 client_id="test", 170 authorization_flow=create_test_flow(), 171 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")], 172 ) 173 with self.assertRaises(RedirectUriError) as cm: 174 request = self.factory.get( 175 "/", 176 data={ 177 "response_type": "code", 178 "client_id": "test", 179 "redirect_uri": "http://localhost", 180 }, 181 ) 182 OAuthAuthorizationParams.from_request(request) 183 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 184 185 def test_redirect_uri_invalid_regex(self): 186 """test missing/invalid redirect URI (invalid regex)""" 187 OAuth2Provider.objects.create( 188 name=generate_id(), 189 client_id="test", 190 authorization_flow=create_test_flow(), 191 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")], 192 ) 193 with self.assertRaises(RedirectUriError) as cm: 194 request = self.factory.get( 195 "/", 196 data={ 197 "response_type": "code", 198 "client_id": "test", 199 "redirect_uri": "http://localhost", 200 }, 201 ) 202 OAuthAuthorizationParams.from_request(request) 203 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 204 205 def test_redirect_uri_regex(self): 206 """test valid redirect URI (regex)""" 207 OAuth2Provider.objects.create( 208 name=generate_id(), 209 client_id="test", 210 authorization_flow=create_test_flow(), 211 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")], 212 ) 213 request = self.factory.get( 214 "/", 215 data={ 216 "response_type": "code", 217 "client_id": "test", 218 "redirect_uri": "http://foo.bar.baz", 219 }, 220 ) 221 OAuthAuthorizationParams.from_request(request) 222 223 @apply_blueprint("system/providers-oauth2.yaml") 224 def test_response_type(self): 225 """test response_type""" 226 provider = OAuth2Provider.objects.create( 227 name=generate_id(), 228 client_id="test", 229 authorization_flow=create_test_flow(), 230 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 231 ) 232 provider.property_mappings.set( 233 ScopeMapping.objects.filter( 234 managed__in=[ 235 "goauthentik.io/providers/oauth2/scope-openid", 236 "goauthentik.io/providers/oauth2/scope-email", 237 "goauthentik.io/providers/oauth2/scope-profile", 238 ] 239 ) 240 ) 241 request = self.factory.get( 242 "/", 243 data={ 244 "response_type": "code", 245 "client_id": "test", 246 "redirect_uri": "http://local.invalid/Foo", 247 }, 248 ) 249 self.assertEqual( 250 OAuthAuthorizationParams.from_request(request).grant_type, 251 GrantTypes.AUTHORIZATION_CODE, 252 ) 253 self.assertEqual( 254 OAuthAuthorizationParams.from_request(request).redirect_uri, 255 "http://local.invalid/Foo", 256 ) 257 request = self.factory.get( 258 "/", 259 data={ 260 "response_type": "id_token", 261 "client_id": "test", 262 "redirect_uri": "http://local.invalid/Foo", 263 "scope": "openid", 264 "state": "foo", 265 "nonce": generate_id(), 266 }, 267 ) 268 self.assertEqual( 269 OAuthAuthorizationParams.from_request(request).grant_type, 270 GrantTypes.IMPLICIT, 271 ) 272 # Implicit without openid scope 273 with self.assertRaises(AuthorizeError) as cm: 274 request = self.factory.get( 275 "/", 276 data={ 277 "response_type": "id_token", 278 "client_id": "test", 279 "redirect_uri": "http://local.invalid/Foo", 280 "state": "foo", 281 }, 282 ) 283 self.assertEqual( 284 OAuthAuthorizationParams.from_request(request).grant_type, 285 GrantTypes.IMPLICIT, 286 ) 287 request = self.factory.get( 288 "/", 289 data={ 290 "response_type": "code token", 291 "client_id": "test", 292 "redirect_uri": "http://local.invalid/Foo", 293 "scope": "openid", 294 "state": "foo", 295 }, 296 ) 297 self.assertEqual( 298 OAuthAuthorizationParams.from_request(request).grant_type, GrantTypes.HYBRID 299 ) 300 with self.assertRaises(AuthorizeError) as cm: 301 request = self.factory.get( 302 "/", 303 data={ 304 "response_type": "invalid", 305 "client_id": "test", 306 "redirect_uri": "http://local.invalid/Foo", 307 }, 308 ) 309 OAuthAuthorizationParams.from_request(request) 310 self.assertEqual(cm.exception.error, "unsupported_response_type") 311 312 def test_full_code(self): 313 """Test full authorization""" 314 flow = create_test_flow() 315 provider = OAuth2Provider.objects.create( 316 name=generate_id(), 317 client_id="test", 318 authorization_flow=flow, 319 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 320 access_code_validity="seconds=100", 321 ) 322 Application.objects.create(name="app", slug="app", provider=provider) 323 state = generate_id() 324 user = create_test_admin_user() 325 self.client.force_login(user) 326 # Step 1, initiate params and get redirect to flow 327 response = self.client.get( 328 reverse("authentik_providers_oauth2:authorize"), 329 data={ 330 "response_type": "code", 331 "client_id": "test", 332 "state": state, 333 "redirect_uri": "foo://localhost", 334 }, 335 ) 336 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 337 self.assertEqual( 338 response.url, 339 f"foo://localhost?code={code.code}&state={state}", 340 ) 341 self.assertAlmostEqual( 342 code.expires.timestamp() - now().timestamp(), 343 timedelta_from_string(provider.access_code_validity).total_seconds(), 344 delta=5, 345 ) 346 347 @apply_blueprint("system/providers-oauth2.yaml") 348 def test_full_implicit(self): 349 """Test full authorization""" 350 flow = create_test_flow() 351 provider: OAuth2Provider = OAuth2Provider.objects.create( 352 name=generate_id(), 353 client_id="test", 354 authorization_flow=flow, 355 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 356 signing_key=self.keypair, 357 ) 358 provider.property_mappings.set( 359 ScopeMapping.objects.filter( 360 managed__in=[ 361 "goauthentik.io/providers/oauth2/scope-openid", 362 "goauthentik.io/providers/oauth2/scope-email", 363 "goauthentik.io/providers/oauth2/scope-profile", 364 ] 365 ) 366 ) 367 provider.property_mappings.add( 368 ScopeMapping.objects.create( 369 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 370 ) 371 ) 372 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 373 state = generate_id() 374 user = create_test_admin_user() 375 self.client.force_login(user) 376 with patch( 377 "authentik.providers.oauth2.id_token.get_login_event", 378 MagicMock( 379 return_value=Event( 380 action=EventAction.LOGIN, 381 context={PLAN_CONTEXT_METHOD: "password"}, 382 created=now(), 383 ) 384 ), 385 ): 386 # Step 1, initiate params and get redirect to flow 387 response = self.client.get( 388 reverse("authentik_providers_oauth2:authorize"), 389 data={ 390 "response_type": "id_token", 391 "client_id": "test", 392 "state": state, 393 "scope": "openid test", 394 "redirect_uri": "http://localhost", 395 "nonce": generate_id(), 396 }, 397 ) 398 token: AccessToken = AccessToken.objects.filter(user=user).first() 399 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 400 self.assertEqual( 401 response.url, 402 ( 403 f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}" 404 f"&token_type={TOKEN_TYPE}" 405 f"&expires_in={int(expires)}&state={state}" 406 ), 407 ) 408 jwt = self.validate_jwt(token, provider) 409 self.assertEqual(jwt["amr"], ["pwd"]) 410 self.assertEqual(jwt["sub"], "foo") 411 self.assertAlmostEqual( 412 jwt["exp"] - now().timestamp(), 413 expires, 414 delta=5, 415 ) 416 417 @apply_blueprint("system/providers-oauth2.yaml") 418 def test_full_implicit_enc(self): 419 """Test full authorization with encryption""" 420 flow = create_test_flow() 421 provider: OAuth2Provider = OAuth2Provider.objects.create( 422 name=generate_id(), 423 client_id="test", 424 authorization_flow=flow, 425 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 426 signing_key=self.keypair, 427 encryption_key=self.keypair, 428 ) 429 provider.property_mappings.set( 430 ScopeMapping.objects.filter( 431 managed__in=[ 432 "goauthentik.io/providers/oauth2/scope-openid", 433 "goauthentik.io/providers/oauth2/scope-email", 434 "goauthentik.io/providers/oauth2/scope-profile", 435 ] 436 ) 437 ) 438 provider.property_mappings.add( 439 ScopeMapping.objects.create( 440 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 441 ) 442 ) 443 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 444 state = generate_id() 445 user = create_test_admin_user() 446 self.client.force_login(user) 447 with patch( 448 "authentik.providers.oauth2.id_token.get_login_event", 449 MagicMock( 450 return_value=Event( 451 action=EventAction.LOGIN, 452 context={PLAN_CONTEXT_METHOD: "password"}, 453 created=now(), 454 ) 455 ), 456 ): 457 # Step 1, initiate params and get redirect to flow 458 response = self.client.get( 459 reverse("authentik_providers_oauth2:authorize"), 460 data={ 461 "response_type": "id_token", 462 "client_id": "test", 463 "state": state, 464 "scope": "openid test", 465 "redirect_uri": "http://localhost", 466 "nonce": generate_id(), 467 }, 468 ) 469 self.assertEqual(response.status_code, 302) 470 token: AccessToken = AccessToken.objects.filter(user=user).first() 471 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 472 jwt = self.validate_jwe(token, provider) 473 self.assertEqual(jwt["amr"], ["pwd"]) 474 self.assertEqual(jwt["sub"], "foo") 475 self.assertAlmostEqual( 476 jwt["exp"] - now().timestamp(), 477 expires, 478 delta=5, 479 ) 480 481 def test_full_fragment_code(self): 482 """Test full authorization""" 483 flow = create_test_flow() 484 provider: OAuth2Provider = OAuth2Provider.objects.create( 485 name=generate_id(), 486 client_id="test", 487 authorization_flow=flow, 488 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 489 signing_key=self.keypair, 490 ) 491 Application.objects.create(name="app", slug="app", provider=provider) 492 state = generate_id() 493 user = create_test_admin_user() 494 self.client.force_login(user) 495 with patch( 496 "authentik.providers.oauth2.id_token.get_login_event", 497 MagicMock( 498 return_value=Event( 499 action=EventAction.LOGIN, 500 context={PLAN_CONTEXT_METHOD: "password"}, 501 created=now(), 502 ) 503 ), 504 ): 505 # Step 1, initiate params and get redirect to flow 506 response = self.client.get( 507 reverse("authentik_providers_oauth2:authorize"), 508 data={ 509 "response_type": "code", 510 "response_mode": "fragment", 511 "client_id": "test", 512 "state": state, 513 "scope": "openid", 514 "redirect_uri": "http://localhost", 515 "nonce": generate_id(), 516 }, 517 ) 518 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 519 self.assertEqual( 520 response.url, 521 f"http://localhost#code={code.code}&state={state}", 522 ) 523 self.assertAlmostEqual( 524 code.expires.timestamp() - now().timestamp(), 525 timedelta_from_string(provider.access_code_validity).total_seconds(), 526 delta=5, 527 ) 528 529 @apply_blueprint("system/providers-oauth2.yaml") 530 def test_full_form_post_id_token(self): 531 """Test full authorization (form_post response)""" 532 flow = create_test_flow() 533 provider = OAuth2Provider.objects.create( 534 name=generate_id(), 535 client_id=generate_id(), 536 authorization_flow=flow, 537 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 538 signing_key=self.keypair, 539 ) 540 provider.property_mappings.set( 541 ScopeMapping.objects.filter( 542 managed__in=[ 543 "goauthentik.io/providers/oauth2/scope-openid", 544 "goauthentik.io/providers/oauth2/scope-email", 545 "goauthentik.io/providers/oauth2/scope-profile", 546 ] 547 ) 548 ) 549 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 550 state = generate_id() 551 user = create_test_admin_user() 552 self.client.force_login(user) 553 # Step 1, initiate params and get redirect to flow 554 self.client.get( 555 reverse("authentik_providers_oauth2:authorize"), 556 data={ 557 "response_type": "id_token", 558 "response_mode": "form_post", 559 "client_id": provider.client_id, 560 "state": state, 561 "scope": "openid", 562 "redirect_uri": "http://localhost", 563 "nonce": generate_id(), 564 }, 565 ) 566 response = self.client.get( 567 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 568 ) 569 token: AccessToken = AccessToken.objects.filter(user=user).first() 570 self.assertIsNotNone(token) 571 self.assertJSONEqual( 572 response.content.decode(), 573 { 574 "component": "ak-stage-autosubmit", 575 "url": "http://localhost", 576 "title": f"Redirecting to {app.name}...", 577 "attrs": { 578 "id_token": provider.encode(token.id_token.to_dict()), 579 "token_type": TOKEN_TYPE, 580 "expires_in": "3600", 581 "state": state, 582 }, 583 }, 584 ) 585 self.validate_jwt(token, provider) 586 587 def test_full_form_post_code(self): 588 """Test full authorization (form_post response, code type)""" 589 flow = create_test_flow() 590 provider = OAuth2Provider.objects.create( 591 name=generate_id(), 592 client_id=generate_id(), 593 authorization_flow=flow, 594 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 595 signing_key=self.keypair, 596 ) 597 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 598 state = generate_id() 599 user = create_test_admin_user() 600 self.client.force_login(user) 601 # Step 1, initiate params and get redirect to flow 602 self.client.get( 603 reverse("authentik_providers_oauth2:authorize"), 604 data={ 605 "response_type": "code", 606 "response_mode": "form_post", 607 "client_id": provider.client_id, 608 "state": state, 609 "scope": "openid", 610 "redirect_uri": "http://localhost", 611 }, 612 ) 613 response = self.client.get( 614 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 615 ) 616 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 617 self.assertJSONEqual( 618 response.content.decode(), 619 { 620 "component": "ak-stage-autosubmit", 621 "url": "http://localhost", 622 "title": f"Redirecting to {app.name}...", 623 "attrs": { 624 "code": code.code, 625 "state": state, 626 }, 627 }, 628 ) 629 630 def test_openid_missing_invalid(self): 631 """test request requiring an OpenID scope to be set""" 632 OAuth2Provider.objects.create( 633 name=generate_id(), 634 client_id="test", 635 authorization_flow=create_test_flow(), 636 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 637 ) 638 request = self.factory.get( 639 "/", 640 data={ 641 "response_type": "id_token", 642 "client_id": "test", 643 "redirect_uri": "http://localhost", 644 "scope": "", 645 }, 646 ) 647 with self.assertRaises(AuthorizeError) as cm: 648 OAuthAuthorizationParams.from_request(request) 649 self.assertEqual(cm.exception.cause, "scope_openid_missing") 650 651 @apply_blueprint("system/providers-oauth2.yaml") 652 def test_offline_access_invalid(self): 653 """test request for offline_access with invalid response type""" 654 provider = OAuth2Provider.objects.create( 655 name=generate_id(), 656 client_id="test", 657 authorization_flow=create_test_flow(), 658 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 659 ) 660 provider.property_mappings.set( 661 ScopeMapping.objects.filter( 662 managed__in=[ 663 "goauthentik.io/providers/oauth2/scope-openid", 664 "goauthentik.io/providers/oauth2/scope-offline_access", 665 ] 666 ) 667 ) 668 request = self.factory.get( 669 "/", 670 data={ 671 "response_type": "id_token", 672 "client_id": "test", 673 "redirect_uri": "http://localhost", 674 "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}", 675 "nonce": generate_id(), 676 }, 677 ) 678 parsed = OAuthAuthorizationParams.from_request(request) 679 self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope) 680 681 @apply_blueprint("default/flow-default-authentication-flow.yaml") 682 def test_ui_locales(self): 683 """Test OIDC ui_locales authorization""" 684 flow = create_test_flow() 685 provider = OAuth2Provider.objects.create( 686 name=generate_id(), 687 client_id="test", 688 authorization_flow=flow, 689 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 690 access_code_validity="seconds=100", 691 ) 692 Application.objects.create(name="app", slug="app", provider=provider) 693 state = generate_id() 694 self.client.logout() 695 try: 696 response = self.client.get( 697 reverse("authentik_providers_oauth2:authorize"), 698 data={ 699 "response_type": "code", 700 "client_id": "test", 701 "state": state, 702 "redirect_uri": "foo://localhost", 703 "ui_locales": "invalid fr", 704 }, 705 ) 706 parsed = parse_qs(urlparse(response.url).query) 707 self.assertEqual(parsed["locale"], ["fr"]) 708 finally: 709 translation.deactivate() 710 711 @apply_blueprint("default/flow-default-authentication-flow.yaml") 712 def test_ui_locales_invalid(self): 713 """Test OIDC ui_locales authorization""" 714 flow = create_test_flow() 715 provider = OAuth2Provider.objects.create( 716 name=generate_id(), 717 client_id="test", 718 authorization_flow=flow, 719 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 720 access_code_validity="seconds=100", 721 ) 722 Application.objects.create(name="app", slug="app", provider=provider) 723 state = generate_id() 724 self.client.logout() 725 response = self.client.get( 726 reverse("authentik_providers_oauth2:authorize"), 727 data={ 728 "response_type": "code", 729 "client_id": "test", 730 "state": state, 731 "redirect_uri": "foo://localhost", 732 "ui_locales": "invalid", 733 }, 734 ) 735 parsed = parse_qs(urlparse(response.url).query) 736 self.assertNotIn("locale", parsed) 737 738 def test_authentication_flow(self): 739 """Test custom authentication flow""" 740 brand = create_test_brand() 741 global_auth = create_test_flow() 742 FlowStageBinding.objects.create( 743 target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10 744 ) 745 brand.flow_authentication = global_auth 746 brand.save() 747 748 flow = create_test_flow() 749 auth_flow = create_test_flow() 750 FlowStageBinding.objects.create( 751 target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10 752 ) 753 provider = OAuth2Provider.objects.create( 754 name=generate_id(), 755 client_id="test", 756 authorization_flow=flow, 757 authentication_flow=auth_flow, 758 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 759 access_code_validity="seconds=100", 760 ) 761 Application.objects.create(name="app", slug="app", provider=provider) 762 state = generate_id() 763 response = self.client.get( 764 reverse("authentik_providers_oauth2:authorize"), 765 data={ 766 "response_type": "code", 767 "client_id": "test", 768 "state": state, 769 "redirect_uri": "foo://localhost", 770 }, 771 ) 772 self.assertEqual(response.status_code, 302) 773 self.assertIn(auth_flow.slug, response.url) 774 self.assertNotIn(global_auth.slug, response.url) 775 776 @apply_blueprint("default/flow-default-authentication-flow.yaml") 777 def test_login_hint(self): 778 """Login hint""" 779 flow = create_test_flow() 780 provider = OAuth2Provider.objects.create( 781 name=generate_id(), 782 client_id="test", 783 authorization_flow=flow, 784 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 785 access_code_validity="seconds=100", 786 ) 787 Application.objects.create(name="app", slug="app", provider=provider) 788 state = generate_id() 789 response = self.client.get( 790 reverse("authentik_providers_oauth2:authorize"), 791 data={ 792 "response_type": "code", 793 "client_id": "test", 794 "state": state, 795 "redirect_uri": "foo://localhost", 796 "login_hint": "foo", 797 }, 798 ) 799 self.assertEqual(response.status_code, 302) 800 plan = self.client.session.get(SESSION_KEY_PLAN) 801 self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")
Test authorize view
45 def test_invalid_grant_type(self): 46 """Test with invalid grant type""" 47 OAuth2Provider.objects.create( 48 name=generate_id(), 49 client_id="test", 50 authorization_flow=create_test_flow(), 51 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 52 ) 53 with self.assertRaises(AuthorizeError) as cm: 54 request = self.factory.get( 55 "/", 56 data={ 57 "response_type": "invalid", 58 "client_id": "test", 59 "redirect_uri": "http://local.invalid/Foo", 60 }, 61 ) 62 OAuthAuthorizationParams.from_request(request) 63 self.assertEqual(cm.exception.error, "unsupported_response_type")
Test with invalid grant type
65 def test_invalid_client_id(self): 66 """Test invalid client ID""" 67 with self.assertRaises(ClientIdError): 68 request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"}) 69 OAuthAuthorizationParams.from_request(request)
Test invalid client ID
71 def test_request(self): 72 """test request param""" 73 OAuth2Provider.objects.create( 74 name=generate_id(), 75 client_id="test", 76 authorization_flow=create_test_flow(), 77 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 78 ) 79 with self.assertRaises(AuthorizeError) as cm: 80 request = self.factory.get( 81 "/", 82 data={ 83 "response_type": "code", 84 "client_id": "test", 85 "redirect_uri": "http://local.invalid/Foo", 86 "request": "foo", 87 }, 88 ) 89 OAuthAuthorizationParams.from_request(request) 90 self.assertEqual(cm.exception.error, "request_not_supported")
test request param
92 def test_invalid_redirect_uri_missing(self): 93 """test missing redirect URI""" 94 OAuth2Provider.objects.create( 95 name=generate_id(), 96 client_id="test", 97 authorization_flow=create_test_flow(), 98 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 99 ) 100 with self.assertRaises(RedirectUriError) as cm: 101 request = self.factory.get("/", data={"response_type": "code", "client_id": "test"}) 102 OAuthAuthorizationParams.from_request(request) 103 self.assertEqual(cm.exception.cause, "redirect_uri_missing")
test missing redirect URI
105 def test_invalid_redirect_uri(self): 106 """test invalid redirect URI""" 107 OAuth2Provider.objects.create( 108 name=generate_id(), 109 client_id="test", 110 authorization_flow=create_test_flow(), 111 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 112 ) 113 with self.assertRaises(RedirectUriError) as cm: 114 request = self.factory.get( 115 "/", 116 data={ 117 "response_type": "code", 118 "client_id": "test", 119 "redirect_uri": "http://localhost", 120 }, 121 ) 122 OAuthAuthorizationParams.from_request(request) 123 self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
test invalid redirect URI
125 def test_blocked_redirect_uri(self): 126 """test missing/invalid redirect URI""" 127 OAuth2Provider.objects.create( 128 name=generate_id(), 129 client_id="test", 130 authorization_flow=create_test_flow(), 131 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")], 132 ) 133 with self.assertRaises(RedirectUriError) as cm: 134 request = self.factory.get( 135 "/", 136 data={ 137 "response_type": "code", 138 "client_id": "test", 139 "redirect_uri": "data:localhost", 140 }, 141 ) 142 OAuthAuthorizationParams.from_request(request) 143 self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme")
test missing/invalid redirect URI
145 def test_invalid_redirect_uri_empty(self): 146 """test missing/invalid redirect URI""" 147 provider = OAuth2Provider.objects.create( 148 name=generate_id(), 149 client_id="test", 150 authorization_flow=create_test_flow(), 151 redirect_uris=[], 152 ) 153 request = self.factory.get( 154 "/", 155 data={ 156 "response_type": "code", 157 "client_id": "test", 158 "redirect_uri": "+", 159 }, 160 ) 161 OAuthAuthorizationParams.from_request(request) 162 provider.refresh_from_db() 163 self.assertEqual(provider.redirect_uris, [RedirectURI(RedirectURIMatchingMode.STRICT, "+")])
test missing/invalid redirect URI
165 def test_invalid_redirect_uri_regex(self): 166 """test missing/invalid redirect URI""" 167 OAuth2Provider.objects.create( 168 name=generate_id(), 169 client_id="test", 170 authorization_flow=create_test_flow(), 171 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")], 172 ) 173 with self.assertRaises(RedirectUriError) as cm: 174 request = self.factory.get( 175 "/", 176 data={ 177 "response_type": "code", 178 "client_id": "test", 179 "redirect_uri": "http://localhost", 180 }, 181 ) 182 OAuthAuthorizationParams.from_request(request) 183 self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
test missing/invalid redirect URI
185 def test_redirect_uri_invalid_regex(self): 186 """test missing/invalid redirect URI (invalid regex)""" 187 OAuth2Provider.objects.create( 188 name=generate_id(), 189 client_id="test", 190 authorization_flow=create_test_flow(), 191 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")], 192 ) 193 with self.assertRaises(RedirectUriError) as cm: 194 request = self.factory.get( 195 "/", 196 data={ 197 "response_type": "code", 198 "client_id": "test", 199 "redirect_uri": "http://localhost", 200 }, 201 ) 202 OAuthAuthorizationParams.from_request(request) 203 self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
test missing/invalid redirect URI (invalid regex)
205 def test_redirect_uri_regex(self): 206 """test valid redirect URI (regex)""" 207 OAuth2Provider.objects.create( 208 name=generate_id(), 209 client_id="test", 210 authorization_flow=create_test_flow(), 211 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")], 212 ) 213 request = self.factory.get( 214 "/", 215 data={ 216 "response_type": "code", 217 "client_id": "test", 218 "redirect_uri": "http://foo.bar.baz", 219 }, 220 ) 221 OAuthAuthorizationParams.from_request(request)
test valid redirect URI (regex)
223 @apply_blueprint("system/providers-oauth2.yaml") 224 def test_response_type(self): 225 """test response_type""" 226 provider = OAuth2Provider.objects.create( 227 name=generate_id(), 228 client_id="test", 229 authorization_flow=create_test_flow(), 230 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 231 ) 232 provider.property_mappings.set( 233 ScopeMapping.objects.filter( 234 managed__in=[ 235 "goauthentik.io/providers/oauth2/scope-openid", 236 "goauthentik.io/providers/oauth2/scope-email", 237 "goauthentik.io/providers/oauth2/scope-profile", 238 ] 239 ) 240 ) 241 request = self.factory.get( 242 "/", 243 data={ 244 "response_type": "code", 245 "client_id": "test", 246 "redirect_uri": "http://local.invalid/Foo", 247 }, 248 ) 249 self.assertEqual( 250 OAuthAuthorizationParams.from_request(request).grant_type, 251 GrantTypes.AUTHORIZATION_CODE, 252 ) 253 self.assertEqual( 254 OAuthAuthorizationParams.from_request(request).redirect_uri, 255 "http://local.invalid/Foo", 256 ) 257 request = self.factory.get( 258 "/", 259 data={ 260 "response_type": "id_token", 261 "client_id": "test", 262 "redirect_uri": "http://local.invalid/Foo", 263 "scope": "openid", 264 "state": "foo", 265 "nonce": generate_id(), 266 }, 267 ) 268 self.assertEqual( 269 OAuthAuthorizationParams.from_request(request).grant_type, 270 GrantTypes.IMPLICIT, 271 ) 272 # Implicit without openid scope 273 with self.assertRaises(AuthorizeError) as cm: 274 request = self.factory.get( 275 "/", 276 data={ 277 "response_type": "id_token", 278 "client_id": "test", 279 "redirect_uri": "http://local.invalid/Foo", 280 "state": "foo", 281 }, 282 ) 283 self.assertEqual( 284 OAuthAuthorizationParams.from_request(request).grant_type, 285 GrantTypes.IMPLICIT, 286 ) 287 request = self.factory.get( 288 "/", 289 data={ 290 "response_type": "code token", 291 "client_id": "test", 292 "redirect_uri": "http://local.invalid/Foo", 293 "scope": "openid", 294 "state": "foo", 295 }, 296 ) 297 self.assertEqual( 298 OAuthAuthorizationParams.from_request(request).grant_type, GrantTypes.HYBRID 299 ) 300 with self.assertRaises(AuthorizeError) as cm: 301 request = self.factory.get( 302 "/", 303 data={ 304 "response_type": "invalid", 305 "client_id": "test", 306 "redirect_uri": "http://local.invalid/Foo", 307 }, 308 ) 309 OAuthAuthorizationParams.from_request(request) 310 self.assertEqual(cm.exception.error, "unsupported_response_type")
test response_type
312 def test_full_code(self): 313 """Test full authorization""" 314 flow = create_test_flow() 315 provider = OAuth2Provider.objects.create( 316 name=generate_id(), 317 client_id="test", 318 authorization_flow=flow, 319 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 320 access_code_validity="seconds=100", 321 ) 322 Application.objects.create(name="app", slug="app", provider=provider) 323 state = generate_id() 324 user = create_test_admin_user() 325 self.client.force_login(user) 326 # Step 1, initiate params and get redirect to flow 327 response = self.client.get( 328 reverse("authentik_providers_oauth2:authorize"), 329 data={ 330 "response_type": "code", 331 "client_id": "test", 332 "state": state, 333 "redirect_uri": "foo://localhost", 334 }, 335 ) 336 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 337 self.assertEqual( 338 response.url, 339 f"foo://localhost?code={code.code}&state={state}", 340 ) 341 self.assertAlmostEqual( 342 code.expires.timestamp() - now().timestamp(), 343 timedelta_from_string(provider.access_code_validity).total_seconds(), 344 delta=5, 345 )
Test full authorization
347 @apply_blueprint("system/providers-oauth2.yaml") 348 def test_full_implicit(self): 349 """Test full authorization""" 350 flow = create_test_flow() 351 provider: OAuth2Provider = OAuth2Provider.objects.create( 352 name=generate_id(), 353 client_id="test", 354 authorization_flow=flow, 355 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 356 signing_key=self.keypair, 357 ) 358 provider.property_mappings.set( 359 ScopeMapping.objects.filter( 360 managed__in=[ 361 "goauthentik.io/providers/oauth2/scope-openid", 362 "goauthentik.io/providers/oauth2/scope-email", 363 "goauthentik.io/providers/oauth2/scope-profile", 364 ] 365 ) 366 ) 367 provider.property_mappings.add( 368 ScopeMapping.objects.create( 369 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 370 ) 371 ) 372 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 373 state = generate_id() 374 user = create_test_admin_user() 375 self.client.force_login(user) 376 with patch( 377 "authentik.providers.oauth2.id_token.get_login_event", 378 MagicMock( 379 return_value=Event( 380 action=EventAction.LOGIN, 381 context={PLAN_CONTEXT_METHOD: "password"}, 382 created=now(), 383 ) 384 ), 385 ): 386 # Step 1, initiate params and get redirect to flow 387 response = self.client.get( 388 reverse("authentik_providers_oauth2:authorize"), 389 data={ 390 "response_type": "id_token", 391 "client_id": "test", 392 "state": state, 393 "scope": "openid test", 394 "redirect_uri": "http://localhost", 395 "nonce": generate_id(), 396 }, 397 ) 398 token: AccessToken = AccessToken.objects.filter(user=user).first() 399 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 400 self.assertEqual( 401 response.url, 402 ( 403 f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}" 404 f"&token_type={TOKEN_TYPE}" 405 f"&expires_in={int(expires)}&state={state}" 406 ), 407 ) 408 jwt = self.validate_jwt(token, provider) 409 self.assertEqual(jwt["amr"], ["pwd"]) 410 self.assertEqual(jwt["sub"], "foo") 411 self.assertAlmostEqual( 412 jwt["exp"] - now().timestamp(), 413 expires, 414 delta=5, 415 )
Test full authorization
417 @apply_blueprint("system/providers-oauth2.yaml") 418 def test_full_implicit_enc(self): 419 """Test full authorization with encryption""" 420 flow = create_test_flow() 421 provider: OAuth2Provider = OAuth2Provider.objects.create( 422 name=generate_id(), 423 client_id="test", 424 authorization_flow=flow, 425 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 426 signing_key=self.keypair, 427 encryption_key=self.keypair, 428 ) 429 provider.property_mappings.set( 430 ScopeMapping.objects.filter( 431 managed__in=[ 432 "goauthentik.io/providers/oauth2/scope-openid", 433 "goauthentik.io/providers/oauth2/scope-email", 434 "goauthentik.io/providers/oauth2/scope-profile", 435 ] 436 ) 437 ) 438 provider.property_mappings.add( 439 ScopeMapping.objects.create( 440 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 441 ) 442 ) 443 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 444 state = generate_id() 445 user = create_test_admin_user() 446 self.client.force_login(user) 447 with patch( 448 "authentik.providers.oauth2.id_token.get_login_event", 449 MagicMock( 450 return_value=Event( 451 action=EventAction.LOGIN, 452 context={PLAN_CONTEXT_METHOD: "password"}, 453 created=now(), 454 ) 455 ), 456 ): 457 # Step 1, initiate params and get redirect to flow 458 response = self.client.get( 459 reverse("authentik_providers_oauth2:authorize"), 460 data={ 461 "response_type": "id_token", 462 "client_id": "test", 463 "state": state, 464 "scope": "openid test", 465 "redirect_uri": "http://localhost", 466 "nonce": generate_id(), 467 }, 468 ) 469 self.assertEqual(response.status_code, 302) 470 token: AccessToken = AccessToken.objects.filter(user=user).first() 471 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 472 jwt = self.validate_jwe(token, provider) 473 self.assertEqual(jwt["amr"], ["pwd"]) 474 self.assertEqual(jwt["sub"], "foo") 475 self.assertAlmostEqual( 476 jwt["exp"] - now().timestamp(), 477 expires, 478 delta=5, 479 )
Test full authorization with encryption
481 def test_full_fragment_code(self): 482 """Test full authorization""" 483 flow = create_test_flow() 484 provider: OAuth2Provider = OAuth2Provider.objects.create( 485 name=generate_id(), 486 client_id="test", 487 authorization_flow=flow, 488 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 489 signing_key=self.keypair, 490 ) 491 Application.objects.create(name="app", slug="app", provider=provider) 492 state = generate_id() 493 user = create_test_admin_user() 494 self.client.force_login(user) 495 with patch( 496 "authentik.providers.oauth2.id_token.get_login_event", 497 MagicMock( 498 return_value=Event( 499 action=EventAction.LOGIN, 500 context={PLAN_CONTEXT_METHOD: "password"}, 501 created=now(), 502 ) 503 ), 504 ): 505 # Step 1, initiate params and get redirect to flow 506 response = self.client.get( 507 reverse("authentik_providers_oauth2:authorize"), 508 data={ 509 "response_type": "code", 510 "response_mode": "fragment", 511 "client_id": "test", 512 "state": state, 513 "scope": "openid", 514 "redirect_uri": "http://localhost", 515 "nonce": generate_id(), 516 }, 517 ) 518 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 519 self.assertEqual( 520 response.url, 521 f"http://localhost#code={code.code}&state={state}", 522 ) 523 self.assertAlmostEqual( 524 code.expires.timestamp() - now().timestamp(), 525 timedelta_from_string(provider.access_code_validity).total_seconds(), 526 delta=5, 527 )
Test full authorization
529 @apply_blueprint("system/providers-oauth2.yaml") 530 def test_full_form_post_id_token(self): 531 """Test full authorization (form_post response)""" 532 flow = create_test_flow() 533 provider = OAuth2Provider.objects.create( 534 name=generate_id(), 535 client_id=generate_id(), 536 authorization_flow=flow, 537 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 538 signing_key=self.keypair, 539 ) 540 provider.property_mappings.set( 541 ScopeMapping.objects.filter( 542 managed__in=[ 543 "goauthentik.io/providers/oauth2/scope-openid", 544 "goauthentik.io/providers/oauth2/scope-email", 545 "goauthentik.io/providers/oauth2/scope-profile", 546 ] 547 ) 548 ) 549 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 550 state = generate_id() 551 user = create_test_admin_user() 552 self.client.force_login(user) 553 # Step 1, initiate params and get redirect to flow 554 self.client.get( 555 reverse("authentik_providers_oauth2:authorize"), 556 data={ 557 "response_type": "id_token", 558 "response_mode": "form_post", 559 "client_id": provider.client_id, 560 "state": state, 561 "scope": "openid", 562 "redirect_uri": "http://localhost", 563 "nonce": generate_id(), 564 }, 565 ) 566 response = self.client.get( 567 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 568 ) 569 token: AccessToken = AccessToken.objects.filter(user=user).first() 570 self.assertIsNotNone(token) 571 self.assertJSONEqual( 572 response.content.decode(), 573 { 574 "component": "ak-stage-autosubmit", 575 "url": "http://localhost", 576 "title": f"Redirecting to {app.name}...", 577 "attrs": { 578 "id_token": provider.encode(token.id_token.to_dict()), 579 "token_type": TOKEN_TYPE, 580 "expires_in": "3600", 581 "state": state, 582 }, 583 }, 584 ) 585 self.validate_jwt(token, provider)
Test full authorization (form_post response)
587 def test_full_form_post_code(self): 588 """Test full authorization (form_post response, code type)""" 589 flow = create_test_flow() 590 provider = OAuth2Provider.objects.create( 591 name=generate_id(), 592 client_id=generate_id(), 593 authorization_flow=flow, 594 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 595 signing_key=self.keypair, 596 ) 597 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 598 state = generate_id() 599 user = create_test_admin_user() 600 self.client.force_login(user) 601 # Step 1, initiate params and get redirect to flow 602 self.client.get( 603 reverse("authentik_providers_oauth2:authorize"), 604 data={ 605 "response_type": "code", 606 "response_mode": "form_post", 607 "client_id": provider.client_id, 608 "state": state, 609 "scope": "openid", 610 "redirect_uri": "http://localhost", 611 }, 612 ) 613 response = self.client.get( 614 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 615 ) 616 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 617 self.assertJSONEqual( 618 response.content.decode(), 619 { 620 "component": "ak-stage-autosubmit", 621 "url": "http://localhost", 622 "title": f"Redirecting to {app.name}...", 623 "attrs": { 624 "code": code.code, 625 "state": state, 626 }, 627 }, 628 )
Test full authorization (form_post response, code type)
630 def test_openid_missing_invalid(self): 631 """test request requiring an OpenID scope to be set""" 632 OAuth2Provider.objects.create( 633 name=generate_id(), 634 client_id="test", 635 authorization_flow=create_test_flow(), 636 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 637 ) 638 request = self.factory.get( 639 "/", 640 data={ 641 "response_type": "id_token", 642 "client_id": "test", 643 "redirect_uri": "http://localhost", 644 "scope": "", 645 }, 646 ) 647 with self.assertRaises(AuthorizeError) as cm: 648 OAuthAuthorizationParams.from_request(request) 649 self.assertEqual(cm.exception.cause, "scope_openid_missing")
test request requiring an OpenID scope to be set
651 @apply_blueprint("system/providers-oauth2.yaml") 652 def test_offline_access_invalid(self): 653 """test request for offline_access with invalid response type""" 654 provider = OAuth2Provider.objects.create( 655 name=generate_id(), 656 client_id="test", 657 authorization_flow=create_test_flow(), 658 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 659 ) 660 provider.property_mappings.set( 661 ScopeMapping.objects.filter( 662 managed__in=[ 663 "goauthentik.io/providers/oauth2/scope-openid", 664 "goauthentik.io/providers/oauth2/scope-offline_access", 665 ] 666 ) 667 ) 668 request = self.factory.get( 669 "/", 670 data={ 671 "response_type": "id_token", 672 "client_id": "test", 673 "redirect_uri": "http://localhost", 674 "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}", 675 "nonce": generate_id(), 676 }, 677 ) 678 parsed = OAuthAuthorizationParams.from_request(request) 679 self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope)
test request for offline_access with invalid response type
681 @apply_blueprint("default/flow-default-authentication-flow.yaml") 682 def test_ui_locales(self): 683 """Test OIDC ui_locales authorization""" 684 flow = create_test_flow() 685 provider = OAuth2Provider.objects.create( 686 name=generate_id(), 687 client_id="test", 688 authorization_flow=flow, 689 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 690 access_code_validity="seconds=100", 691 ) 692 Application.objects.create(name="app", slug="app", provider=provider) 693 state = generate_id() 694 self.client.logout() 695 try: 696 response = self.client.get( 697 reverse("authentik_providers_oauth2:authorize"), 698 data={ 699 "response_type": "code", 700 "client_id": "test", 701 "state": state, 702 "redirect_uri": "foo://localhost", 703 "ui_locales": "invalid fr", 704 }, 705 ) 706 parsed = parse_qs(urlparse(response.url).query) 707 self.assertEqual(parsed["locale"], ["fr"]) 708 finally: 709 translation.deactivate()
Test OIDC ui_locales authorization
711 @apply_blueprint("default/flow-default-authentication-flow.yaml") 712 def test_ui_locales_invalid(self): 713 """Test OIDC ui_locales authorization""" 714 flow = create_test_flow() 715 provider = OAuth2Provider.objects.create( 716 name=generate_id(), 717 client_id="test", 718 authorization_flow=flow, 719 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 720 access_code_validity="seconds=100", 721 ) 722 Application.objects.create(name="app", slug="app", provider=provider) 723 state = generate_id() 724 self.client.logout() 725 response = self.client.get( 726 reverse("authentik_providers_oauth2:authorize"), 727 data={ 728 "response_type": "code", 729 "client_id": "test", 730 "state": state, 731 "redirect_uri": "foo://localhost", 732 "ui_locales": "invalid", 733 }, 734 ) 735 parsed = parse_qs(urlparse(response.url).query) 736 self.assertNotIn("locale", parsed)
Test OIDC ui_locales authorization
738 def test_authentication_flow(self): 739 """Test custom authentication flow""" 740 brand = create_test_brand() 741 global_auth = create_test_flow() 742 FlowStageBinding.objects.create( 743 target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10 744 ) 745 brand.flow_authentication = global_auth 746 brand.save() 747 748 flow = create_test_flow() 749 auth_flow = create_test_flow() 750 FlowStageBinding.objects.create( 751 target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10 752 ) 753 provider = OAuth2Provider.objects.create( 754 name=generate_id(), 755 client_id="test", 756 authorization_flow=flow, 757 authentication_flow=auth_flow, 758 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 759 access_code_validity="seconds=100", 760 ) 761 Application.objects.create(name="app", slug="app", provider=provider) 762 state = generate_id() 763 response = self.client.get( 764 reverse("authentik_providers_oauth2:authorize"), 765 data={ 766 "response_type": "code", 767 "client_id": "test", 768 "state": state, 769 "redirect_uri": "foo://localhost", 770 }, 771 ) 772 self.assertEqual(response.status_code, 302) 773 self.assertIn(auth_flow.slug, response.url) 774 self.assertNotIn(global_auth.slug, response.url)
Test custom authentication flow
776 @apply_blueprint("default/flow-default-authentication-flow.yaml") 777 def test_login_hint(self): 778 """Login hint""" 779 flow = create_test_flow() 780 provider = OAuth2Provider.objects.create( 781 name=generate_id(), 782 client_id="test", 783 authorization_flow=flow, 784 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 785 access_code_validity="seconds=100", 786 ) 787 Application.objects.create(name="app", slug="app", provider=provider) 788 state = generate_id() 789 response = self.client.get( 790 reverse("authentik_providers_oauth2:authorize"), 791 data={ 792 "response_type": "code", 793 "client_id": "test", 794 "state": state, 795 "redirect_uri": "foo://localhost", 796 "login_hint": "foo", 797 }, 798 ) 799 self.assertEqual(response.status_code, 302) 800 plan = self.client.session.get(SESSION_KEY_PLAN) 801 self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")
Login hint