authentik.providers.oauth2.tests.test_authorize
Test authorize view
1"""Test authorize view""" 2 3from unittest.mock import MagicMock, patch 4from urllib.parse import parse_qs, urlparse 5 6from django.test import RequestFactory 7from django.urls import reverse 8from django.utils import translation 9from django.utils.timezone import now 10 11from authentik.blueprints.tests import apply_blueprint 12from authentik.common.oauth.constants import SCOPE_OFFLINE_ACCESS, SCOPE_OPENID, TOKEN_TYPE 13from authentik.core.models import Application 14from authentik.core.tests.utils import create_test_admin_user, create_test_brand, create_test_flow 15from authentik.events.models import Event, EventAction 16from authentik.flows.models import FlowStageBinding 17from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER 18from authentik.flows.views.executor import SESSION_KEY_PLAN 19from authentik.lib.generators import generate_id 20from authentik.lib.utils.time import timedelta_from_string 21from authentik.providers.oauth2.errors import AuthorizeError, ClientIdError, RedirectUriError 22from authentik.providers.oauth2.models import ( 23 AccessToken, 24 AuthorizationCode, 25 GrantType, 26 OAuth2Provider, 27 RedirectURI, 28 RedirectURIMatchingMode, 29 ScopeMapping, 30) 31from authentik.providers.oauth2.tests.utils import OAuthTestCase 32from authentik.providers.oauth2.views.authorize import OAuthAuthorizationParams 33from authentik.stages.dummy.models import DummyStage 34from authentik.stages.password.stage import PLAN_CONTEXT_METHOD 35 36 37class TestAuthorize(OAuthTestCase): 38 """Test authorize view""" 39 40 def setUp(self) -> None: 41 super().setUp() 42 self.factory = RequestFactory() 43 44 def test_disallowed_grant_type(self): 45 """Test with disallowed grant type""" 46 OAuth2Provider.objects.create( 47 name=generate_id(), 48 client_id="test", 49 grant_types=[], 50 authorization_flow=create_test_flow(), 51 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 52 ) 53 with self.assertRaises(AuthorizeError) as cm: 54 request = self.factory.get( 55 "/", 56 data={ 57 "response_type": "code", 58 "client_id": "test", 59 "redirect_uri": "http://local.invalid/Foo", 60 }, 61 ) 62 OAuthAuthorizationParams.from_request(request) 63 self.assertEqual(cm.exception.error, "invalid_request") 64 65 def test_invalid_grant_type(self): 66 """Test with invalid grant type""" 67 OAuth2Provider.objects.create( 68 name=generate_id(), 69 client_id="test", 70 authorization_flow=create_test_flow(), 71 grant_types=[GrantType.AUTHORIZATION_CODE], 72 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 73 ) 74 with self.assertRaises(AuthorizeError) as cm: 75 request = self.factory.get( 76 "/", 77 data={ 78 "response_type": "invalid", 79 "client_id": "test", 80 "redirect_uri": "http://local.invalid/Foo", 81 }, 82 ) 83 OAuthAuthorizationParams.from_request(request) 84 self.assertEqual(cm.exception.error, "unsupported_response_type") 85 86 def test_invalid_client_id(self): 87 """Test invalid client ID""" 88 with self.assertRaises(ClientIdError): 89 request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"}) 90 OAuthAuthorizationParams.from_request(request) 91 92 def test_request(self): 93 """test request param""" 94 OAuth2Provider.objects.create( 95 name=generate_id(), 96 client_id="test", 97 authorization_flow=create_test_flow(), 98 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 99 grant_types=[GrantType.AUTHORIZATION_CODE], 100 ) 101 with self.assertRaises(AuthorizeError) as cm: 102 request = self.factory.get( 103 "/", 104 data={ 105 "response_type": "code", 106 "client_id": "test", 107 "redirect_uri": "http://local.invalid/Foo", 108 "request": "foo", 109 }, 110 ) 111 OAuthAuthorizationParams.from_request(request) 112 self.assertEqual(cm.exception.error, "request_not_supported") 113 114 def test_invalid_redirect_uri_missing(self): 115 """test missing redirect URI""" 116 OAuth2Provider.objects.create( 117 name=generate_id(), 118 client_id="test", 119 authorization_flow=create_test_flow(), 120 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 121 ) 122 with self.assertRaises(RedirectUriError) as cm: 123 request = self.factory.get("/", data={"response_type": "code", "client_id": "test"}) 124 OAuthAuthorizationParams.from_request(request) 125 self.assertEqual(cm.exception.cause, "redirect_uri_missing") 126 127 def test_invalid_redirect_uri(self): 128 """test invalid redirect URI""" 129 OAuth2Provider.objects.create( 130 name=generate_id(), 131 client_id="test", 132 authorization_flow=create_test_flow(), 133 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 134 ) 135 with self.assertRaises(RedirectUriError) as cm: 136 request = self.factory.get( 137 "/", 138 data={ 139 "response_type": "code", 140 "client_id": "test", 141 "redirect_uri": "http://localhost", 142 }, 143 ) 144 OAuthAuthorizationParams.from_request(request) 145 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 146 147 def test_blocked_redirect_uri(self): 148 """test missing/invalid redirect URI""" 149 OAuth2Provider.objects.create( 150 name=generate_id(), 151 client_id="test", 152 authorization_flow=create_test_flow(), 153 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")], 154 ) 155 with self.assertRaises(RedirectUriError) as cm: 156 request = self.factory.get( 157 "/", 158 data={ 159 "response_type": "code", 160 "client_id": "test", 161 "redirect_uri": "data:localhost", 162 }, 163 ) 164 OAuthAuthorizationParams.from_request(request) 165 self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme") 166 167 def test_invalid_redirect_uri_regex(self): 168 """test missing/invalid redirect URI""" 169 OAuth2Provider.objects.create( 170 name=generate_id(), 171 client_id="test", 172 authorization_flow=create_test_flow(), 173 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")], 174 ) 175 with self.assertRaises(RedirectUriError) as cm: 176 request = self.factory.get( 177 "/", 178 data={ 179 "response_type": "code", 180 "client_id": "test", 181 "redirect_uri": "http://localhost", 182 }, 183 ) 184 OAuthAuthorizationParams.from_request(request) 185 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 186 187 def test_redirect_uri_invalid_regex(self): 188 """test missing/invalid redirect URI (invalid regex)""" 189 OAuth2Provider.objects.create( 190 name=generate_id(), 191 client_id="test", 192 authorization_flow=create_test_flow(), 193 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")], 194 ) 195 with self.assertRaises(RedirectUriError) as cm: 196 request = self.factory.get( 197 "/", 198 data={ 199 "response_type": "code", 200 "client_id": "test", 201 "redirect_uri": "http://localhost", 202 }, 203 ) 204 OAuthAuthorizationParams.from_request(request) 205 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 206 207 def test_redirect_uri_regex(self): 208 """test valid redirect URI (regex)""" 209 OAuth2Provider.objects.create( 210 name=generate_id(), 211 client_id="test", 212 authorization_flow=create_test_flow(), 213 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")], 214 grant_types=[GrantType.AUTHORIZATION_CODE], 215 ) 216 request = self.factory.get( 217 "/", 218 data={ 219 "response_type": "code", 220 "client_id": "test", 221 "redirect_uri": "http://foo.bar.baz", 222 }, 223 ) 224 OAuthAuthorizationParams.from_request(request) 225 226 @apply_blueprint("system/providers-oauth2.yaml") 227 def test_response_type(self): 228 """test response_type""" 229 provider = OAuth2Provider.objects.create( 230 name=generate_id(), 231 client_id="test", 232 authorization_flow=create_test_flow(), 233 grant_types=[GrantType.AUTHORIZATION_CODE], 234 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 235 ) 236 provider.property_mappings.set( 237 ScopeMapping.objects.filter( 238 managed__in=[ 239 "goauthentik.io/providers/oauth2/scope-openid", 240 "goauthentik.io/providers/oauth2/scope-email", 241 "goauthentik.io/providers/oauth2/scope-profile", 242 ] 243 ) 244 ) 245 request = self.factory.get( 246 "/", 247 data={ 248 "response_type": "code", 249 "client_id": "test", 250 "redirect_uri": "http://local.invalid/Foo", 251 }, 252 ) 253 self.assertEqual( 254 OAuthAuthorizationParams.from_request(request).grant_type, 255 GrantType.AUTHORIZATION_CODE, 256 ) 257 self.assertEqual( 258 OAuthAuthorizationParams.from_request(request).redirect_uri, 259 "http://local.invalid/Foo", 260 ) 261 provider.grant_types = [GrantType.IMPLICIT] 262 provider.save() 263 request = self.factory.get( 264 "/", 265 data={ 266 "response_type": "id_token", 267 "client_id": "test", 268 "redirect_uri": "http://local.invalid/Foo", 269 "scope": "openid", 270 "state": "foo", 271 "nonce": generate_id(), 272 }, 273 ) 274 self.assertEqual( 275 OAuthAuthorizationParams.from_request(request).grant_type, 276 GrantType.IMPLICIT, 277 ) 278 # Implicit without openid scope 279 with self.assertRaises(AuthorizeError) as cm: 280 request = self.factory.get( 281 "/", 282 data={ 283 "response_type": "id_token", 284 "client_id": "test", 285 "redirect_uri": "http://local.invalid/Foo", 286 "state": "foo", 287 }, 288 ) 289 self.assertEqual( 290 OAuthAuthorizationParams.from_request(request).grant_type, 291 GrantType.IMPLICIT, 292 ) 293 provider.grant_types = [GrantType.HYBRID] 294 provider.save() 295 request = self.factory.get( 296 "/", 297 data={ 298 "response_type": "code token", 299 "client_id": "test", 300 "redirect_uri": "http://local.invalid/Foo", 301 "scope": "openid", 302 "state": "foo", 303 }, 304 ) 305 self.assertEqual( 306 OAuthAuthorizationParams.from_request(request).grant_type, GrantType.HYBRID 307 ) 308 with self.assertRaises(AuthorizeError) as cm: 309 request = self.factory.get( 310 "/", 311 data={ 312 "response_type": "invalid", 313 "client_id": "test", 314 "redirect_uri": "http://local.invalid/Foo", 315 }, 316 ) 317 OAuthAuthorizationParams.from_request(request) 318 self.assertEqual(cm.exception.error, "unsupported_response_type") 319 320 def test_full_code(self): 321 """Test full authorization""" 322 flow = create_test_flow() 323 provider = OAuth2Provider.objects.create( 324 name=generate_id(), 325 client_id="test", 326 authorization_flow=flow, 327 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 328 access_code_validity="seconds=100", 329 grant_types=[GrantType.AUTHORIZATION_CODE], 330 ) 331 Application.objects.create(name="app", slug="app", provider=provider) 332 state = generate_id() 333 user = create_test_admin_user() 334 self.client.force_login(user) 335 # Step 1, initiate params and get redirect to flow 336 response = self.client.get( 337 reverse("authentik_providers_oauth2:authorize"), 338 data={ 339 "response_type": "code", 340 "client_id": "test", 341 "state": state, 342 "redirect_uri": "foo://localhost", 343 }, 344 ) 345 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 346 self.assertEqual( 347 response.url, 348 f"foo://localhost?code={code.code}&state={state}", 349 ) 350 self.assertAlmostEqual( 351 code.expires.timestamp() - now().timestamp(), 352 timedelta_from_string(provider.access_code_validity).total_seconds(), 353 delta=5, 354 ) 355 356 @apply_blueprint("system/providers-oauth2.yaml") 357 def test_full_implicit(self): 358 """Test full authorization""" 359 flow = create_test_flow() 360 provider: OAuth2Provider = OAuth2Provider.objects.create( 361 name=generate_id(), 362 client_id="test", 363 authorization_flow=flow, 364 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 365 signing_key=self.keypair, 366 grant_types=[GrantType.IMPLICIT], 367 ) 368 provider.property_mappings.set( 369 ScopeMapping.objects.filter( 370 managed__in=[ 371 "goauthentik.io/providers/oauth2/scope-openid", 372 "goauthentik.io/providers/oauth2/scope-email", 373 "goauthentik.io/providers/oauth2/scope-profile", 374 ] 375 ) 376 ) 377 provider.property_mappings.add( 378 ScopeMapping.objects.create( 379 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 380 ) 381 ) 382 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 383 state = generate_id() 384 user = create_test_admin_user() 385 self.client.force_login(user) 386 with patch( 387 "authentik.providers.oauth2.id_token.get_login_event", 388 MagicMock( 389 return_value=Event( 390 action=EventAction.LOGIN, 391 context={PLAN_CONTEXT_METHOD: "password"}, 392 created=now(), 393 ) 394 ), 395 ): 396 # Step 1, initiate params and get redirect to flow 397 response = self.client.get( 398 reverse("authentik_providers_oauth2:authorize"), 399 data={ 400 "response_type": "id_token", 401 "client_id": "test", 402 "state": state, 403 "scope": "openid test", 404 "redirect_uri": "http://localhost", 405 "nonce": generate_id(), 406 }, 407 ) 408 token = AccessToken.objects.filter(user=user).first() 409 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 410 self.assertEqual( 411 response.url, 412 ( 413 f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}" 414 f"&token_type={TOKEN_TYPE}" 415 f"&expires_in={int(expires)}&state={state}" 416 ), 417 ) 418 jwt = self.validate_jwt(token, provider) 419 self.assertEqual(jwt["amr"], ["pwd"]) 420 self.assertEqual(jwt["sub"], "foo") 421 self.assertAlmostEqual( 422 jwt["exp"] - now().timestamp(), 423 expires, 424 delta=5, 425 ) 426 427 @apply_blueprint("system/providers-oauth2.yaml") 428 def test_full_implicit_enc(self): 429 """Test full authorization with encryption""" 430 flow = create_test_flow() 431 provider: OAuth2Provider = OAuth2Provider.objects.create( 432 name=generate_id(), 433 client_id="test", 434 authorization_flow=flow, 435 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 436 signing_key=self.keypair, 437 encryption_key=self.keypair, 438 grant_types=[GrantType.IMPLICIT], 439 ) 440 provider.property_mappings.set( 441 ScopeMapping.objects.filter( 442 managed__in=[ 443 "goauthentik.io/providers/oauth2/scope-openid", 444 "goauthentik.io/providers/oauth2/scope-email", 445 "goauthentik.io/providers/oauth2/scope-profile", 446 ] 447 ) 448 ) 449 provider.property_mappings.add( 450 ScopeMapping.objects.create( 451 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 452 ) 453 ) 454 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 455 state = generate_id() 456 user = create_test_admin_user() 457 self.client.force_login(user) 458 with patch( 459 "authentik.providers.oauth2.id_token.get_login_event", 460 MagicMock( 461 return_value=Event( 462 action=EventAction.LOGIN, 463 context={PLAN_CONTEXT_METHOD: "password"}, 464 created=now(), 465 ) 466 ), 467 ): 468 # Step 1, initiate params and get redirect to flow 469 response = self.client.get( 470 reverse("authentik_providers_oauth2:authorize"), 471 data={ 472 "response_type": "id_token", 473 "client_id": "test", 474 "state": state, 475 "scope": "openid test", 476 "redirect_uri": "http://localhost", 477 "nonce": generate_id(), 478 }, 479 ) 480 self.assertEqual(response.status_code, 302) 481 token = AccessToken.objects.filter(user=user).first() 482 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 483 jwt = self.validate_jwe(token, provider) 484 self.assertEqual(jwt["amr"], ["pwd"]) 485 self.assertEqual(jwt["sub"], "foo") 486 self.assertAlmostEqual( 487 jwt["exp"] - now().timestamp(), 488 expires, 489 delta=5, 490 ) 491 492 def test_full_fragment_code(self): 493 """Test full authorization""" 494 flow = create_test_flow() 495 provider: OAuth2Provider = OAuth2Provider.objects.create( 496 name=generate_id(), 497 client_id="test", 498 authorization_flow=flow, 499 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 500 signing_key=self.keypair, 501 grant_types=[GrantType.AUTHORIZATION_CODE], 502 ) 503 Application.objects.create(name="app", slug="app", provider=provider) 504 state = generate_id() 505 user = create_test_admin_user() 506 self.client.force_login(user) 507 with patch( 508 "authentik.providers.oauth2.id_token.get_login_event", 509 MagicMock( 510 return_value=Event( 511 action=EventAction.LOGIN, 512 context={PLAN_CONTEXT_METHOD: "password"}, 513 created=now(), 514 ) 515 ), 516 ): 517 # Step 1, initiate params and get redirect to flow 518 response = self.client.get( 519 reverse("authentik_providers_oauth2:authorize"), 520 data={ 521 "response_type": "code", 522 "response_mode": "fragment", 523 "client_id": "test", 524 "state": state, 525 "scope": "openid", 526 "redirect_uri": "http://localhost", 527 "nonce": generate_id(), 528 }, 529 ) 530 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 531 self.assertEqual( 532 response.url, 533 f"http://localhost#code={code.code}&state={state}", 534 ) 535 self.assertAlmostEqual( 536 code.expires.timestamp() - now().timestamp(), 537 timedelta_from_string(provider.access_code_validity).total_seconds(), 538 delta=5, 539 ) 540 541 @apply_blueprint("system/providers-oauth2.yaml") 542 def test_full_form_post_id_token(self): 543 """Test full authorization (form_post response)""" 544 flow = create_test_flow() 545 provider = OAuth2Provider.objects.create( 546 name=generate_id(), 547 client_id=generate_id(), 548 authorization_flow=flow, 549 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 550 signing_key=self.keypair, 551 grant_types=[GrantType.IMPLICIT], 552 ) 553 provider.property_mappings.set( 554 ScopeMapping.objects.filter( 555 managed__in=[ 556 "goauthentik.io/providers/oauth2/scope-openid", 557 "goauthentik.io/providers/oauth2/scope-email", 558 "goauthentik.io/providers/oauth2/scope-profile", 559 ] 560 ) 561 ) 562 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 563 state = generate_id() 564 user = create_test_admin_user() 565 self.client.force_login(user) 566 # Step 1, initiate params and get redirect to flow 567 self.client.get( 568 reverse("authentik_providers_oauth2:authorize"), 569 data={ 570 "response_type": "id_token", 571 "response_mode": "form_post", 572 "client_id": provider.client_id, 573 "state": state, 574 "scope": "openid", 575 "redirect_uri": "http://localhost", 576 "nonce": generate_id(), 577 }, 578 ) 579 response = self.client.get( 580 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 581 ) 582 token = AccessToken.objects.filter(user=user).first() 583 self.assertIsNotNone(token) 584 self.assertJSONEqual( 585 response.content.decode(), 586 { 587 "component": "ak-stage-autosubmit", 588 "url": "http://localhost", 589 "title": f"Redirecting to {app.name}...", 590 "attrs": { 591 "id_token": provider.encode(token.id_token.to_dict()), 592 "token_type": TOKEN_TYPE, 593 "expires_in": "3600", 594 "state": state, 595 }, 596 }, 597 ) 598 self.validate_jwt(token, provider) 599 600 def test_full_form_post_code(self): 601 """Test full authorization (form_post response, code type)""" 602 flow = create_test_flow() 603 provider = OAuth2Provider.objects.create( 604 name=generate_id(), 605 client_id=generate_id(), 606 authorization_flow=flow, 607 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 608 signing_key=self.keypair, 609 grant_types=[GrantType.AUTHORIZATION_CODE], 610 ) 611 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 612 state = generate_id() 613 user = create_test_admin_user() 614 self.client.force_login(user) 615 # Step 1, initiate params and get redirect to flow 616 self.client.get( 617 reverse("authentik_providers_oauth2:authorize"), 618 data={ 619 "response_type": "code", 620 "response_mode": "form_post", 621 "client_id": provider.client_id, 622 "state": state, 623 "scope": "openid", 624 "redirect_uri": "http://localhost", 625 }, 626 ) 627 response = self.client.get( 628 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 629 ) 630 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 631 self.assertJSONEqual( 632 response.content.decode(), 633 { 634 "component": "ak-stage-autosubmit", 635 "url": "http://localhost", 636 "title": f"Redirecting to {app.name}...", 637 "attrs": { 638 "code": code.code, 639 "state": state, 640 }, 641 }, 642 ) 643 644 def test_openid_missing_invalid(self): 645 """test request requiring an OpenID scope to be set""" 646 OAuth2Provider.objects.create( 647 name=generate_id(), 648 client_id="test", 649 authorization_flow=create_test_flow(), 650 grant_types=[GrantType.IMPLICIT], 651 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 652 ) 653 request = self.factory.get( 654 "/", 655 data={ 656 "response_type": "id_token", 657 "client_id": "test", 658 "redirect_uri": "http://localhost", 659 "scope": "", 660 }, 661 ) 662 with self.assertRaises(AuthorizeError) as cm: 663 OAuthAuthorizationParams.from_request(request) 664 self.assertEqual(cm.exception.cause, "scope_openid_missing") 665 666 @apply_blueprint("system/providers-oauth2.yaml") 667 def test_offline_access_invalid(self): 668 """test request for offline_access with invalid response type""" 669 provider = OAuth2Provider.objects.create( 670 name=generate_id(), 671 client_id="test", 672 authorization_flow=create_test_flow(), 673 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 674 grant_types=[GrantType.IMPLICIT], 675 ) 676 provider.property_mappings.set( 677 ScopeMapping.objects.filter( 678 managed__in=[ 679 "goauthentik.io/providers/oauth2/scope-openid", 680 "goauthentik.io/providers/oauth2/scope-offline_access", 681 ] 682 ) 683 ) 684 request = self.factory.get( 685 "/", 686 data={ 687 "response_type": "id_token", 688 "client_id": "test", 689 "redirect_uri": "http://localhost", 690 "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}", 691 "nonce": generate_id(), 692 }, 693 ) 694 parsed = OAuthAuthorizationParams.from_request(request) 695 self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope) 696 697 @apply_blueprint("default/flow-default-authentication-flow.yaml") 698 def test_ui_locales(self): 699 """Test OIDC ui_locales authorization""" 700 flow = create_test_flow() 701 provider = OAuth2Provider.objects.create( 702 name=generate_id(), 703 client_id="test", 704 authorization_flow=flow, 705 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 706 access_code_validity="seconds=100", 707 grant_types=[GrantType.AUTHORIZATION_CODE], 708 ) 709 Application.objects.create(name="app", slug="app", provider=provider) 710 state = generate_id() 711 self.client.logout() 712 try: 713 response = self.client.get( 714 reverse("authentik_providers_oauth2:authorize"), 715 data={ 716 "response_type": "code", 717 "client_id": "test", 718 "state": state, 719 "redirect_uri": "foo://localhost", 720 "ui_locales": "invalid fr", 721 }, 722 ) 723 parsed = parse_qs(urlparse(response.url).query) 724 self.assertEqual(parsed["locale"], ["fr"]) 725 finally: 726 translation.deactivate() 727 728 @apply_blueprint("default/flow-default-authentication-flow.yaml") 729 def test_ui_locales_invalid(self): 730 """Test OIDC ui_locales authorization""" 731 flow = create_test_flow() 732 provider = OAuth2Provider.objects.create( 733 name=generate_id(), 734 client_id="test", 735 authorization_flow=flow, 736 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 737 access_code_validity="seconds=100", 738 grant_types=[GrantType.AUTHORIZATION_CODE], 739 ) 740 Application.objects.create(name="app", slug="app", provider=provider) 741 state = generate_id() 742 self.client.logout() 743 response = self.client.get( 744 reverse("authentik_providers_oauth2:authorize"), 745 data={ 746 "response_type": "code", 747 "client_id": "test", 748 "state": state, 749 "redirect_uri": "foo://localhost", 750 "ui_locales": "invalid", 751 }, 752 ) 753 parsed = parse_qs(urlparse(response.url).query) 754 self.assertNotIn("locale", parsed) 755 756 def test_authentication_flow(self): 757 """Test custom authentication flow""" 758 brand = create_test_brand() 759 global_auth = create_test_flow() 760 FlowStageBinding.objects.create( 761 target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10 762 ) 763 brand.flow_authentication = global_auth 764 brand.save() 765 766 flow = create_test_flow() 767 auth_flow = create_test_flow() 768 FlowStageBinding.objects.create( 769 target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10 770 ) 771 provider = OAuth2Provider.objects.create( 772 name=generate_id(), 773 client_id="test", 774 authorization_flow=flow, 775 authentication_flow=auth_flow, 776 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 777 access_code_validity="seconds=100", 778 grant_types=[GrantType.AUTHORIZATION_CODE], 779 ) 780 Application.objects.create(name="app", slug="app", provider=provider) 781 state = generate_id() 782 response = self.client.get( 783 reverse("authentik_providers_oauth2:authorize"), 784 data={ 785 "response_type": "code", 786 "client_id": "test", 787 "state": state, 788 "redirect_uri": "foo://localhost", 789 }, 790 ) 791 self.assertEqual(response.status_code, 302) 792 self.assertIn(auth_flow.slug, response.url) 793 self.assertNotIn(global_auth.slug, response.url) 794 795 @apply_blueprint("default/flow-default-authentication-flow.yaml") 796 def test_login_hint(self): 797 """Login hint""" 798 flow = create_test_flow() 799 provider = OAuth2Provider.objects.create( 800 name=generate_id(), 801 client_id="test", 802 authorization_flow=flow, 803 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 804 access_code_validity="seconds=100", 805 grant_types=[GrantType.AUTHORIZATION_CODE], 806 ) 807 Application.objects.create(name="app", slug="app", provider=provider) 808 state = generate_id() 809 response = self.client.get( 810 reverse("authentik_providers_oauth2:authorize"), 811 data={ 812 "response_type": "code", 813 "client_id": "test", 814 "state": state, 815 "redirect_uri": "foo://localhost", 816 "login_hint": "foo", 817 }, 818 ) 819 self.assertEqual(response.status_code, 302) 820 plan = self.client.session.get(SESSION_KEY_PLAN) 821 self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")
38class TestAuthorize(OAuthTestCase): 39 """Test authorize view""" 40 41 def setUp(self) -> None: 42 super().setUp() 43 self.factory = RequestFactory() 44 45 def test_disallowed_grant_type(self): 46 """Test with disallowed grant type""" 47 OAuth2Provider.objects.create( 48 name=generate_id(), 49 client_id="test", 50 grant_types=[], 51 authorization_flow=create_test_flow(), 52 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 53 ) 54 with self.assertRaises(AuthorizeError) as cm: 55 request = self.factory.get( 56 "/", 57 data={ 58 "response_type": "code", 59 "client_id": "test", 60 "redirect_uri": "http://local.invalid/Foo", 61 }, 62 ) 63 OAuthAuthorizationParams.from_request(request) 64 self.assertEqual(cm.exception.error, "invalid_request") 65 66 def test_invalid_grant_type(self): 67 """Test with invalid grant type""" 68 OAuth2Provider.objects.create( 69 name=generate_id(), 70 client_id="test", 71 authorization_flow=create_test_flow(), 72 grant_types=[GrantType.AUTHORIZATION_CODE], 73 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 74 ) 75 with self.assertRaises(AuthorizeError) as cm: 76 request = self.factory.get( 77 "/", 78 data={ 79 "response_type": "invalid", 80 "client_id": "test", 81 "redirect_uri": "http://local.invalid/Foo", 82 }, 83 ) 84 OAuthAuthorizationParams.from_request(request) 85 self.assertEqual(cm.exception.error, "unsupported_response_type") 86 87 def test_invalid_client_id(self): 88 """Test invalid client ID""" 89 with self.assertRaises(ClientIdError): 90 request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"}) 91 OAuthAuthorizationParams.from_request(request) 92 93 def test_request(self): 94 """test request param""" 95 OAuth2Provider.objects.create( 96 name=generate_id(), 97 client_id="test", 98 authorization_flow=create_test_flow(), 99 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 100 grant_types=[GrantType.AUTHORIZATION_CODE], 101 ) 102 with self.assertRaises(AuthorizeError) as cm: 103 request = self.factory.get( 104 "/", 105 data={ 106 "response_type": "code", 107 "client_id": "test", 108 "redirect_uri": "http://local.invalid/Foo", 109 "request": "foo", 110 }, 111 ) 112 OAuthAuthorizationParams.from_request(request) 113 self.assertEqual(cm.exception.error, "request_not_supported") 114 115 def test_invalid_redirect_uri_missing(self): 116 """test missing redirect URI""" 117 OAuth2Provider.objects.create( 118 name=generate_id(), 119 client_id="test", 120 authorization_flow=create_test_flow(), 121 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 122 ) 123 with self.assertRaises(RedirectUriError) as cm: 124 request = self.factory.get("/", data={"response_type": "code", "client_id": "test"}) 125 OAuthAuthorizationParams.from_request(request) 126 self.assertEqual(cm.exception.cause, "redirect_uri_missing") 127 128 def test_invalid_redirect_uri(self): 129 """test invalid redirect URI""" 130 OAuth2Provider.objects.create( 131 name=generate_id(), 132 client_id="test", 133 authorization_flow=create_test_flow(), 134 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 135 ) 136 with self.assertRaises(RedirectUriError) as cm: 137 request = self.factory.get( 138 "/", 139 data={ 140 "response_type": "code", 141 "client_id": "test", 142 "redirect_uri": "http://localhost", 143 }, 144 ) 145 OAuthAuthorizationParams.from_request(request) 146 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 147 148 def test_blocked_redirect_uri(self): 149 """test missing/invalid redirect URI""" 150 OAuth2Provider.objects.create( 151 name=generate_id(), 152 client_id="test", 153 authorization_flow=create_test_flow(), 154 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")], 155 ) 156 with self.assertRaises(RedirectUriError) as cm: 157 request = self.factory.get( 158 "/", 159 data={ 160 "response_type": "code", 161 "client_id": "test", 162 "redirect_uri": "data:localhost", 163 }, 164 ) 165 OAuthAuthorizationParams.from_request(request) 166 self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme") 167 168 def test_invalid_redirect_uri_regex(self): 169 """test missing/invalid redirect URI""" 170 OAuth2Provider.objects.create( 171 name=generate_id(), 172 client_id="test", 173 authorization_flow=create_test_flow(), 174 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")], 175 ) 176 with self.assertRaises(RedirectUriError) as cm: 177 request = self.factory.get( 178 "/", 179 data={ 180 "response_type": "code", 181 "client_id": "test", 182 "redirect_uri": "http://localhost", 183 }, 184 ) 185 OAuthAuthorizationParams.from_request(request) 186 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 187 188 def test_redirect_uri_invalid_regex(self): 189 """test missing/invalid redirect URI (invalid regex)""" 190 OAuth2Provider.objects.create( 191 name=generate_id(), 192 client_id="test", 193 authorization_flow=create_test_flow(), 194 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")], 195 ) 196 with self.assertRaises(RedirectUriError) as cm: 197 request = self.factory.get( 198 "/", 199 data={ 200 "response_type": "code", 201 "client_id": "test", 202 "redirect_uri": "http://localhost", 203 }, 204 ) 205 OAuthAuthorizationParams.from_request(request) 206 self.assertEqual(cm.exception.cause, "redirect_uri_no_match") 207 208 def test_redirect_uri_regex(self): 209 """test valid redirect URI (regex)""" 210 OAuth2Provider.objects.create( 211 name=generate_id(), 212 client_id="test", 213 authorization_flow=create_test_flow(), 214 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")], 215 grant_types=[GrantType.AUTHORIZATION_CODE], 216 ) 217 request = self.factory.get( 218 "/", 219 data={ 220 "response_type": "code", 221 "client_id": "test", 222 "redirect_uri": "http://foo.bar.baz", 223 }, 224 ) 225 OAuthAuthorizationParams.from_request(request) 226 227 @apply_blueprint("system/providers-oauth2.yaml") 228 def test_response_type(self): 229 """test response_type""" 230 provider = OAuth2Provider.objects.create( 231 name=generate_id(), 232 client_id="test", 233 authorization_flow=create_test_flow(), 234 grant_types=[GrantType.AUTHORIZATION_CODE], 235 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 236 ) 237 provider.property_mappings.set( 238 ScopeMapping.objects.filter( 239 managed__in=[ 240 "goauthentik.io/providers/oauth2/scope-openid", 241 "goauthentik.io/providers/oauth2/scope-email", 242 "goauthentik.io/providers/oauth2/scope-profile", 243 ] 244 ) 245 ) 246 request = self.factory.get( 247 "/", 248 data={ 249 "response_type": "code", 250 "client_id": "test", 251 "redirect_uri": "http://local.invalid/Foo", 252 }, 253 ) 254 self.assertEqual( 255 OAuthAuthorizationParams.from_request(request).grant_type, 256 GrantType.AUTHORIZATION_CODE, 257 ) 258 self.assertEqual( 259 OAuthAuthorizationParams.from_request(request).redirect_uri, 260 "http://local.invalid/Foo", 261 ) 262 provider.grant_types = [GrantType.IMPLICIT] 263 provider.save() 264 request = self.factory.get( 265 "/", 266 data={ 267 "response_type": "id_token", 268 "client_id": "test", 269 "redirect_uri": "http://local.invalid/Foo", 270 "scope": "openid", 271 "state": "foo", 272 "nonce": generate_id(), 273 }, 274 ) 275 self.assertEqual( 276 OAuthAuthorizationParams.from_request(request).grant_type, 277 GrantType.IMPLICIT, 278 ) 279 # Implicit without openid scope 280 with self.assertRaises(AuthorizeError) as cm: 281 request = self.factory.get( 282 "/", 283 data={ 284 "response_type": "id_token", 285 "client_id": "test", 286 "redirect_uri": "http://local.invalid/Foo", 287 "state": "foo", 288 }, 289 ) 290 self.assertEqual( 291 OAuthAuthorizationParams.from_request(request).grant_type, 292 GrantType.IMPLICIT, 293 ) 294 provider.grant_types = [GrantType.HYBRID] 295 provider.save() 296 request = self.factory.get( 297 "/", 298 data={ 299 "response_type": "code token", 300 "client_id": "test", 301 "redirect_uri": "http://local.invalid/Foo", 302 "scope": "openid", 303 "state": "foo", 304 }, 305 ) 306 self.assertEqual( 307 OAuthAuthorizationParams.from_request(request).grant_type, GrantType.HYBRID 308 ) 309 with self.assertRaises(AuthorizeError) as cm: 310 request = self.factory.get( 311 "/", 312 data={ 313 "response_type": "invalid", 314 "client_id": "test", 315 "redirect_uri": "http://local.invalid/Foo", 316 }, 317 ) 318 OAuthAuthorizationParams.from_request(request) 319 self.assertEqual(cm.exception.error, "unsupported_response_type") 320 321 def test_full_code(self): 322 """Test full authorization""" 323 flow = create_test_flow() 324 provider = OAuth2Provider.objects.create( 325 name=generate_id(), 326 client_id="test", 327 authorization_flow=flow, 328 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 329 access_code_validity="seconds=100", 330 grant_types=[GrantType.AUTHORIZATION_CODE], 331 ) 332 Application.objects.create(name="app", slug="app", provider=provider) 333 state = generate_id() 334 user = create_test_admin_user() 335 self.client.force_login(user) 336 # Step 1, initiate params and get redirect to flow 337 response = self.client.get( 338 reverse("authentik_providers_oauth2:authorize"), 339 data={ 340 "response_type": "code", 341 "client_id": "test", 342 "state": state, 343 "redirect_uri": "foo://localhost", 344 }, 345 ) 346 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 347 self.assertEqual( 348 response.url, 349 f"foo://localhost?code={code.code}&state={state}", 350 ) 351 self.assertAlmostEqual( 352 code.expires.timestamp() - now().timestamp(), 353 timedelta_from_string(provider.access_code_validity).total_seconds(), 354 delta=5, 355 ) 356 357 @apply_blueprint("system/providers-oauth2.yaml") 358 def test_full_implicit(self): 359 """Test full authorization""" 360 flow = create_test_flow() 361 provider: OAuth2Provider = OAuth2Provider.objects.create( 362 name=generate_id(), 363 client_id="test", 364 authorization_flow=flow, 365 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 366 signing_key=self.keypair, 367 grant_types=[GrantType.IMPLICIT], 368 ) 369 provider.property_mappings.set( 370 ScopeMapping.objects.filter( 371 managed__in=[ 372 "goauthentik.io/providers/oauth2/scope-openid", 373 "goauthentik.io/providers/oauth2/scope-email", 374 "goauthentik.io/providers/oauth2/scope-profile", 375 ] 376 ) 377 ) 378 provider.property_mappings.add( 379 ScopeMapping.objects.create( 380 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 381 ) 382 ) 383 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 384 state = generate_id() 385 user = create_test_admin_user() 386 self.client.force_login(user) 387 with patch( 388 "authentik.providers.oauth2.id_token.get_login_event", 389 MagicMock( 390 return_value=Event( 391 action=EventAction.LOGIN, 392 context={PLAN_CONTEXT_METHOD: "password"}, 393 created=now(), 394 ) 395 ), 396 ): 397 # Step 1, initiate params and get redirect to flow 398 response = self.client.get( 399 reverse("authentik_providers_oauth2:authorize"), 400 data={ 401 "response_type": "id_token", 402 "client_id": "test", 403 "state": state, 404 "scope": "openid test", 405 "redirect_uri": "http://localhost", 406 "nonce": generate_id(), 407 }, 408 ) 409 token = AccessToken.objects.filter(user=user).first() 410 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 411 self.assertEqual( 412 response.url, 413 ( 414 f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}" 415 f"&token_type={TOKEN_TYPE}" 416 f"&expires_in={int(expires)}&state={state}" 417 ), 418 ) 419 jwt = self.validate_jwt(token, provider) 420 self.assertEqual(jwt["amr"], ["pwd"]) 421 self.assertEqual(jwt["sub"], "foo") 422 self.assertAlmostEqual( 423 jwt["exp"] - now().timestamp(), 424 expires, 425 delta=5, 426 ) 427 428 @apply_blueprint("system/providers-oauth2.yaml") 429 def test_full_implicit_enc(self): 430 """Test full authorization with encryption""" 431 flow = create_test_flow() 432 provider: OAuth2Provider = OAuth2Provider.objects.create( 433 name=generate_id(), 434 client_id="test", 435 authorization_flow=flow, 436 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 437 signing_key=self.keypair, 438 encryption_key=self.keypair, 439 grant_types=[GrantType.IMPLICIT], 440 ) 441 provider.property_mappings.set( 442 ScopeMapping.objects.filter( 443 managed__in=[ 444 "goauthentik.io/providers/oauth2/scope-openid", 445 "goauthentik.io/providers/oauth2/scope-email", 446 "goauthentik.io/providers/oauth2/scope-profile", 447 ] 448 ) 449 ) 450 provider.property_mappings.add( 451 ScopeMapping.objects.create( 452 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 453 ) 454 ) 455 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 456 state = generate_id() 457 user = create_test_admin_user() 458 self.client.force_login(user) 459 with patch( 460 "authentik.providers.oauth2.id_token.get_login_event", 461 MagicMock( 462 return_value=Event( 463 action=EventAction.LOGIN, 464 context={PLAN_CONTEXT_METHOD: "password"}, 465 created=now(), 466 ) 467 ), 468 ): 469 # Step 1, initiate params and get redirect to flow 470 response = self.client.get( 471 reverse("authentik_providers_oauth2:authorize"), 472 data={ 473 "response_type": "id_token", 474 "client_id": "test", 475 "state": state, 476 "scope": "openid test", 477 "redirect_uri": "http://localhost", 478 "nonce": generate_id(), 479 }, 480 ) 481 self.assertEqual(response.status_code, 302) 482 token = AccessToken.objects.filter(user=user).first() 483 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 484 jwt = self.validate_jwe(token, provider) 485 self.assertEqual(jwt["amr"], ["pwd"]) 486 self.assertEqual(jwt["sub"], "foo") 487 self.assertAlmostEqual( 488 jwt["exp"] - now().timestamp(), 489 expires, 490 delta=5, 491 ) 492 493 def test_full_fragment_code(self): 494 """Test full authorization""" 495 flow = create_test_flow() 496 provider: OAuth2Provider = OAuth2Provider.objects.create( 497 name=generate_id(), 498 client_id="test", 499 authorization_flow=flow, 500 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 501 signing_key=self.keypair, 502 grant_types=[GrantType.AUTHORIZATION_CODE], 503 ) 504 Application.objects.create(name="app", slug="app", provider=provider) 505 state = generate_id() 506 user = create_test_admin_user() 507 self.client.force_login(user) 508 with patch( 509 "authentik.providers.oauth2.id_token.get_login_event", 510 MagicMock( 511 return_value=Event( 512 action=EventAction.LOGIN, 513 context={PLAN_CONTEXT_METHOD: "password"}, 514 created=now(), 515 ) 516 ), 517 ): 518 # Step 1, initiate params and get redirect to flow 519 response = self.client.get( 520 reverse("authentik_providers_oauth2:authorize"), 521 data={ 522 "response_type": "code", 523 "response_mode": "fragment", 524 "client_id": "test", 525 "state": state, 526 "scope": "openid", 527 "redirect_uri": "http://localhost", 528 "nonce": generate_id(), 529 }, 530 ) 531 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 532 self.assertEqual( 533 response.url, 534 f"http://localhost#code={code.code}&state={state}", 535 ) 536 self.assertAlmostEqual( 537 code.expires.timestamp() - now().timestamp(), 538 timedelta_from_string(provider.access_code_validity).total_seconds(), 539 delta=5, 540 ) 541 542 @apply_blueprint("system/providers-oauth2.yaml") 543 def test_full_form_post_id_token(self): 544 """Test full authorization (form_post response)""" 545 flow = create_test_flow() 546 provider = OAuth2Provider.objects.create( 547 name=generate_id(), 548 client_id=generate_id(), 549 authorization_flow=flow, 550 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 551 signing_key=self.keypair, 552 grant_types=[GrantType.IMPLICIT], 553 ) 554 provider.property_mappings.set( 555 ScopeMapping.objects.filter( 556 managed__in=[ 557 "goauthentik.io/providers/oauth2/scope-openid", 558 "goauthentik.io/providers/oauth2/scope-email", 559 "goauthentik.io/providers/oauth2/scope-profile", 560 ] 561 ) 562 ) 563 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 564 state = generate_id() 565 user = create_test_admin_user() 566 self.client.force_login(user) 567 # Step 1, initiate params and get redirect to flow 568 self.client.get( 569 reverse("authentik_providers_oauth2:authorize"), 570 data={ 571 "response_type": "id_token", 572 "response_mode": "form_post", 573 "client_id": provider.client_id, 574 "state": state, 575 "scope": "openid", 576 "redirect_uri": "http://localhost", 577 "nonce": generate_id(), 578 }, 579 ) 580 response = self.client.get( 581 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 582 ) 583 token = AccessToken.objects.filter(user=user).first() 584 self.assertIsNotNone(token) 585 self.assertJSONEqual( 586 response.content.decode(), 587 { 588 "component": "ak-stage-autosubmit", 589 "url": "http://localhost", 590 "title": f"Redirecting to {app.name}...", 591 "attrs": { 592 "id_token": provider.encode(token.id_token.to_dict()), 593 "token_type": TOKEN_TYPE, 594 "expires_in": "3600", 595 "state": state, 596 }, 597 }, 598 ) 599 self.validate_jwt(token, provider) 600 601 def test_full_form_post_code(self): 602 """Test full authorization (form_post response, code type)""" 603 flow = create_test_flow() 604 provider = OAuth2Provider.objects.create( 605 name=generate_id(), 606 client_id=generate_id(), 607 authorization_flow=flow, 608 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 609 signing_key=self.keypair, 610 grant_types=[GrantType.AUTHORIZATION_CODE], 611 ) 612 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 613 state = generate_id() 614 user = create_test_admin_user() 615 self.client.force_login(user) 616 # Step 1, initiate params and get redirect to flow 617 self.client.get( 618 reverse("authentik_providers_oauth2:authorize"), 619 data={ 620 "response_type": "code", 621 "response_mode": "form_post", 622 "client_id": provider.client_id, 623 "state": state, 624 "scope": "openid", 625 "redirect_uri": "http://localhost", 626 }, 627 ) 628 response = self.client.get( 629 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 630 ) 631 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 632 self.assertJSONEqual( 633 response.content.decode(), 634 { 635 "component": "ak-stage-autosubmit", 636 "url": "http://localhost", 637 "title": f"Redirecting to {app.name}...", 638 "attrs": { 639 "code": code.code, 640 "state": state, 641 }, 642 }, 643 ) 644 645 def test_openid_missing_invalid(self): 646 """test request requiring an OpenID scope to be set""" 647 OAuth2Provider.objects.create( 648 name=generate_id(), 649 client_id="test", 650 authorization_flow=create_test_flow(), 651 grant_types=[GrantType.IMPLICIT], 652 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 653 ) 654 request = self.factory.get( 655 "/", 656 data={ 657 "response_type": "id_token", 658 "client_id": "test", 659 "redirect_uri": "http://localhost", 660 "scope": "", 661 }, 662 ) 663 with self.assertRaises(AuthorizeError) as cm: 664 OAuthAuthorizationParams.from_request(request) 665 self.assertEqual(cm.exception.cause, "scope_openid_missing") 666 667 @apply_blueprint("system/providers-oauth2.yaml") 668 def test_offline_access_invalid(self): 669 """test request for offline_access with invalid response type""" 670 provider = OAuth2Provider.objects.create( 671 name=generate_id(), 672 client_id="test", 673 authorization_flow=create_test_flow(), 674 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 675 grant_types=[GrantType.IMPLICIT], 676 ) 677 provider.property_mappings.set( 678 ScopeMapping.objects.filter( 679 managed__in=[ 680 "goauthentik.io/providers/oauth2/scope-openid", 681 "goauthentik.io/providers/oauth2/scope-offline_access", 682 ] 683 ) 684 ) 685 request = self.factory.get( 686 "/", 687 data={ 688 "response_type": "id_token", 689 "client_id": "test", 690 "redirect_uri": "http://localhost", 691 "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}", 692 "nonce": generate_id(), 693 }, 694 ) 695 parsed = OAuthAuthorizationParams.from_request(request) 696 self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope) 697 698 @apply_blueprint("default/flow-default-authentication-flow.yaml") 699 def test_ui_locales(self): 700 """Test OIDC ui_locales authorization""" 701 flow = create_test_flow() 702 provider = OAuth2Provider.objects.create( 703 name=generate_id(), 704 client_id="test", 705 authorization_flow=flow, 706 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 707 access_code_validity="seconds=100", 708 grant_types=[GrantType.AUTHORIZATION_CODE], 709 ) 710 Application.objects.create(name="app", slug="app", provider=provider) 711 state = generate_id() 712 self.client.logout() 713 try: 714 response = self.client.get( 715 reverse("authentik_providers_oauth2:authorize"), 716 data={ 717 "response_type": "code", 718 "client_id": "test", 719 "state": state, 720 "redirect_uri": "foo://localhost", 721 "ui_locales": "invalid fr", 722 }, 723 ) 724 parsed = parse_qs(urlparse(response.url).query) 725 self.assertEqual(parsed["locale"], ["fr"]) 726 finally: 727 translation.deactivate() 728 729 @apply_blueprint("default/flow-default-authentication-flow.yaml") 730 def test_ui_locales_invalid(self): 731 """Test OIDC ui_locales authorization""" 732 flow = create_test_flow() 733 provider = OAuth2Provider.objects.create( 734 name=generate_id(), 735 client_id="test", 736 authorization_flow=flow, 737 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 738 access_code_validity="seconds=100", 739 grant_types=[GrantType.AUTHORIZATION_CODE], 740 ) 741 Application.objects.create(name="app", slug="app", provider=provider) 742 state = generate_id() 743 self.client.logout() 744 response = self.client.get( 745 reverse("authentik_providers_oauth2:authorize"), 746 data={ 747 "response_type": "code", 748 "client_id": "test", 749 "state": state, 750 "redirect_uri": "foo://localhost", 751 "ui_locales": "invalid", 752 }, 753 ) 754 parsed = parse_qs(urlparse(response.url).query) 755 self.assertNotIn("locale", parsed) 756 757 def test_authentication_flow(self): 758 """Test custom authentication flow""" 759 brand = create_test_brand() 760 global_auth = create_test_flow() 761 FlowStageBinding.objects.create( 762 target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10 763 ) 764 brand.flow_authentication = global_auth 765 brand.save() 766 767 flow = create_test_flow() 768 auth_flow = create_test_flow() 769 FlowStageBinding.objects.create( 770 target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10 771 ) 772 provider = OAuth2Provider.objects.create( 773 name=generate_id(), 774 client_id="test", 775 authorization_flow=flow, 776 authentication_flow=auth_flow, 777 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 778 access_code_validity="seconds=100", 779 grant_types=[GrantType.AUTHORIZATION_CODE], 780 ) 781 Application.objects.create(name="app", slug="app", provider=provider) 782 state = generate_id() 783 response = self.client.get( 784 reverse("authentik_providers_oauth2:authorize"), 785 data={ 786 "response_type": "code", 787 "client_id": "test", 788 "state": state, 789 "redirect_uri": "foo://localhost", 790 }, 791 ) 792 self.assertEqual(response.status_code, 302) 793 self.assertIn(auth_flow.slug, response.url) 794 self.assertNotIn(global_auth.slug, response.url) 795 796 @apply_blueprint("default/flow-default-authentication-flow.yaml") 797 def test_login_hint(self): 798 """Login hint""" 799 flow = create_test_flow() 800 provider = OAuth2Provider.objects.create( 801 name=generate_id(), 802 client_id="test", 803 authorization_flow=flow, 804 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 805 access_code_validity="seconds=100", 806 grant_types=[GrantType.AUTHORIZATION_CODE], 807 ) 808 Application.objects.create(name="app", slug="app", provider=provider) 809 state = generate_id() 810 response = self.client.get( 811 reverse("authentik_providers_oauth2:authorize"), 812 data={ 813 "response_type": "code", 814 "client_id": "test", 815 "state": state, 816 "redirect_uri": "foo://localhost", 817 "login_hint": "foo", 818 }, 819 ) 820 self.assertEqual(response.status_code, 302) 821 plan = self.client.session.get(SESSION_KEY_PLAN) 822 self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")
Test authorize view
45 def test_disallowed_grant_type(self): 46 """Test with disallowed grant type""" 47 OAuth2Provider.objects.create( 48 name=generate_id(), 49 client_id="test", 50 grant_types=[], 51 authorization_flow=create_test_flow(), 52 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 53 ) 54 with self.assertRaises(AuthorizeError) as cm: 55 request = self.factory.get( 56 "/", 57 data={ 58 "response_type": "code", 59 "client_id": "test", 60 "redirect_uri": "http://local.invalid/Foo", 61 }, 62 ) 63 OAuthAuthorizationParams.from_request(request) 64 self.assertEqual(cm.exception.error, "invalid_request")
Test with disallowed grant type
66 def test_invalid_grant_type(self): 67 """Test with invalid grant type""" 68 OAuth2Provider.objects.create( 69 name=generate_id(), 70 client_id="test", 71 authorization_flow=create_test_flow(), 72 grant_types=[GrantType.AUTHORIZATION_CODE], 73 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 74 ) 75 with self.assertRaises(AuthorizeError) as cm: 76 request = self.factory.get( 77 "/", 78 data={ 79 "response_type": "invalid", 80 "client_id": "test", 81 "redirect_uri": "http://local.invalid/Foo", 82 }, 83 ) 84 OAuthAuthorizationParams.from_request(request) 85 self.assertEqual(cm.exception.error, "unsupported_response_type")
Test with invalid grant type
87 def test_invalid_client_id(self): 88 """Test invalid client ID""" 89 with self.assertRaises(ClientIdError): 90 request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"}) 91 OAuthAuthorizationParams.from_request(request)
Test invalid client ID
93 def test_request(self): 94 """test request param""" 95 OAuth2Provider.objects.create( 96 name=generate_id(), 97 client_id="test", 98 authorization_flow=create_test_flow(), 99 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 100 grant_types=[GrantType.AUTHORIZATION_CODE], 101 ) 102 with self.assertRaises(AuthorizeError) as cm: 103 request = self.factory.get( 104 "/", 105 data={ 106 "response_type": "code", 107 "client_id": "test", 108 "redirect_uri": "http://local.invalid/Foo", 109 "request": "foo", 110 }, 111 ) 112 OAuthAuthorizationParams.from_request(request) 113 self.assertEqual(cm.exception.error, "request_not_supported")
test request param
115 def test_invalid_redirect_uri_missing(self): 116 """test missing redirect URI""" 117 OAuth2Provider.objects.create( 118 name=generate_id(), 119 client_id="test", 120 authorization_flow=create_test_flow(), 121 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 122 ) 123 with self.assertRaises(RedirectUriError) as cm: 124 request = self.factory.get("/", data={"response_type": "code", "client_id": "test"}) 125 OAuthAuthorizationParams.from_request(request) 126 self.assertEqual(cm.exception.cause, "redirect_uri_missing")
test missing redirect URI
128 def test_invalid_redirect_uri(self): 129 """test invalid redirect URI""" 130 OAuth2Provider.objects.create( 131 name=generate_id(), 132 client_id="test", 133 authorization_flow=create_test_flow(), 134 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 135 ) 136 with self.assertRaises(RedirectUriError) as cm: 137 request = self.factory.get( 138 "/", 139 data={ 140 "response_type": "code", 141 "client_id": "test", 142 "redirect_uri": "http://localhost", 143 }, 144 ) 145 OAuthAuthorizationParams.from_request(request) 146 self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
test invalid redirect URI
148 def test_blocked_redirect_uri(self): 149 """test missing/invalid redirect URI""" 150 OAuth2Provider.objects.create( 151 name=generate_id(), 152 client_id="test", 153 authorization_flow=create_test_flow(), 154 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")], 155 ) 156 with self.assertRaises(RedirectUriError) as cm: 157 request = self.factory.get( 158 "/", 159 data={ 160 "response_type": "code", 161 "client_id": "test", 162 "redirect_uri": "data:localhost", 163 }, 164 ) 165 OAuthAuthorizationParams.from_request(request) 166 self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme")
test missing/invalid redirect URI
168 def test_invalid_redirect_uri_regex(self): 169 """test missing/invalid redirect URI""" 170 OAuth2Provider.objects.create( 171 name=generate_id(), 172 client_id="test", 173 authorization_flow=create_test_flow(), 174 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")], 175 ) 176 with self.assertRaises(RedirectUriError) as cm: 177 request = self.factory.get( 178 "/", 179 data={ 180 "response_type": "code", 181 "client_id": "test", 182 "redirect_uri": "http://localhost", 183 }, 184 ) 185 OAuthAuthorizationParams.from_request(request) 186 self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
test missing/invalid redirect URI
188 def test_redirect_uri_invalid_regex(self): 189 """test missing/invalid redirect URI (invalid regex)""" 190 OAuth2Provider.objects.create( 191 name=generate_id(), 192 client_id="test", 193 authorization_flow=create_test_flow(), 194 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")], 195 ) 196 with self.assertRaises(RedirectUriError) as cm: 197 request = self.factory.get( 198 "/", 199 data={ 200 "response_type": "code", 201 "client_id": "test", 202 "redirect_uri": "http://localhost", 203 }, 204 ) 205 OAuthAuthorizationParams.from_request(request) 206 self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
test missing/invalid redirect URI (invalid regex)
208 def test_redirect_uri_regex(self): 209 """test valid redirect URI (regex)""" 210 OAuth2Provider.objects.create( 211 name=generate_id(), 212 client_id="test", 213 authorization_flow=create_test_flow(), 214 redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")], 215 grant_types=[GrantType.AUTHORIZATION_CODE], 216 ) 217 request = self.factory.get( 218 "/", 219 data={ 220 "response_type": "code", 221 "client_id": "test", 222 "redirect_uri": "http://foo.bar.baz", 223 }, 224 ) 225 OAuthAuthorizationParams.from_request(request)
test valid redirect URI (regex)
227 @apply_blueprint("system/providers-oauth2.yaml") 228 def test_response_type(self): 229 """test response_type""" 230 provider = OAuth2Provider.objects.create( 231 name=generate_id(), 232 client_id="test", 233 authorization_flow=create_test_flow(), 234 grant_types=[GrantType.AUTHORIZATION_CODE], 235 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")], 236 ) 237 provider.property_mappings.set( 238 ScopeMapping.objects.filter( 239 managed__in=[ 240 "goauthentik.io/providers/oauth2/scope-openid", 241 "goauthentik.io/providers/oauth2/scope-email", 242 "goauthentik.io/providers/oauth2/scope-profile", 243 ] 244 ) 245 ) 246 request = self.factory.get( 247 "/", 248 data={ 249 "response_type": "code", 250 "client_id": "test", 251 "redirect_uri": "http://local.invalid/Foo", 252 }, 253 ) 254 self.assertEqual( 255 OAuthAuthorizationParams.from_request(request).grant_type, 256 GrantType.AUTHORIZATION_CODE, 257 ) 258 self.assertEqual( 259 OAuthAuthorizationParams.from_request(request).redirect_uri, 260 "http://local.invalid/Foo", 261 ) 262 provider.grant_types = [GrantType.IMPLICIT] 263 provider.save() 264 request = self.factory.get( 265 "/", 266 data={ 267 "response_type": "id_token", 268 "client_id": "test", 269 "redirect_uri": "http://local.invalid/Foo", 270 "scope": "openid", 271 "state": "foo", 272 "nonce": generate_id(), 273 }, 274 ) 275 self.assertEqual( 276 OAuthAuthorizationParams.from_request(request).grant_type, 277 GrantType.IMPLICIT, 278 ) 279 # Implicit without openid scope 280 with self.assertRaises(AuthorizeError) as cm: 281 request = self.factory.get( 282 "/", 283 data={ 284 "response_type": "id_token", 285 "client_id": "test", 286 "redirect_uri": "http://local.invalid/Foo", 287 "state": "foo", 288 }, 289 ) 290 self.assertEqual( 291 OAuthAuthorizationParams.from_request(request).grant_type, 292 GrantType.IMPLICIT, 293 ) 294 provider.grant_types = [GrantType.HYBRID] 295 provider.save() 296 request = self.factory.get( 297 "/", 298 data={ 299 "response_type": "code token", 300 "client_id": "test", 301 "redirect_uri": "http://local.invalid/Foo", 302 "scope": "openid", 303 "state": "foo", 304 }, 305 ) 306 self.assertEqual( 307 OAuthAuthorizationParams.from_request(request).grant_type, GrantType.HYBRID 308 ) 309 with self.assertRaises(AuthorizeError) as cm: 310 request = self.factory.get( 311 "/", 312 data={ 313 "response_type": "invalid", 314 "client_id": "test", 315 "redirect_uri": "http://local.invalid/Foo", 316 }, 317 ) 318 OAuthAuthorizationParams.from_request(request) 319 self.assertEqual(cm.exception.error, "unsupported_response_type")
test response_type
321 def test_full_code(self): 322 """Test full authorization""" 323 flow = create_test_flow() 324 provider = OAuth2Provider.objects.create( 325 name=generate_id(), 326 client_id="test", 327 authorization_flow=flow, 328 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 329 access_code_validity="seconds=100", 330 grant_types=[GrantType.AUTHORIZATION_CODE], 331 ) 332 Application.objects.create(name="app", slug="app", provider=provider) 333 state = generate_id() 334 user = create_test_admin_user() 335 self.client.force_login(user) 336 # Step 1, initiate params and get redirect to flow 337 response = self.client.get( 338 reverse("authentik_providers_oauth2:authorize"), 339 data={ 340 "response_type": "code", 341 "client_id": "test", 342 "state": state, 343 "redirect_uri": "foo://localhost", 344 }, 345 ) 346 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 347 self.assertEqual( 348 response.url, 349 f"foo://localhost?code={code.code}&state={state}", 350 ) 351 self.assertAlmostEqual( 352 code.expires.timestamp() - now().timestamp(), 353 timedelta_from_string(provider.access_code_validity).total_seconds(), 354 delta=5, 355 )
Test full authorization
357 @apply_blueprint("system/providers-oauth2.yaml") 358 def test_full_implicit(self): 359 """Test full authorization""" 360 flow = create_test_flow() 361 provider: OAuth2Provider = OAuth2Provider.objects.create( 362 name=generate_id(), 363 client_id="test", 364 authorization_flow=flow, 365 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 366 signing_key=self.keypair, 367 grant_types=[GrantType.IMPLICIT], 368 ) 369 provider.property_mappings.set( 370 ScopeMapping.objects.filter( 371 managed__in=[ 372 "goauthentik.io/providers/oauth2/scope-openid", 373 "goauthentik.io/providers/oauth2/scope-email", 374 "goauthentik.io/providers/oauth2/scope-profile", 375 ] 376 ) 377 ) 378 provider.property_mappings.add( 379 ScopeMapping.objects.create( 380 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 381 ) 382 ) 383 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 384 state = generate_id() 385 user = create_test_admin_user() 386 self.client.force_login(user) 387 with patch( 388 "authentik.providers.oauth2.id_token.get_login_event", 389 MagicMock( 390 return_value=Event( 391 action=EventAction.LOGIN, 392 context={PLAN_CONTEXT_METHOD: "password"}, 393 created=now(), 394 ) 395 ), 396 ): 397 # Step 1, initiate params and get redirect to flow 398 response = self.client.get( 399 reverse("authentik_providers_oauth2:authorize"), 400 data={ 401 "response_type": "id_token", 402 "client_id": "test", 403 "state": state, 404 "scope": "openid test", 405 "redirect_uri": "http://localhost", 406 "nonce": generate_id(), 407 }, 408 ) 409 token = AccessToken.objects.filter(user=user).first() 410 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 411 self.assertEqual( 412 response.url, 413 ( 414 f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}" 415 f"&token_type={TOKEN_TYPE}" 416 f"&expires_in={int(expires)}&state={state}" 417 ), 418 ) 419 jwt = self.validate_jwt(token, provider) 420 self.assertEqual(jwt["amr"], ["pwd"]) 421 self.assertEqual(jwt["sub"], "foo") 422 self.assertAlmostEqual( 423 jwt["exp"] - now().timestamp(), 424 expires, 425 delta=5, 426 )
Test full authorization
428 @apply_blueprint("system/providers-oauth2.yaml") 429 def test_full_implicit_enc(self): 430 """Test full authorization with encryption""" 431 flow = create_test_flow() 432 provider: OAuth2Provider = OAuth2Provider.objects.create( 433 name=generate_id(), 434 client_id="test", 435 authorization_flow=flow, 436 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 437 signing_key=self.keypair, 438 encryption_key=self.keypair, 439 grant_types=[GrantType.IMPLICIT], 440 ) 441 provider.property_mappings.set( 442 ScopeMapping.objects.filter( 443 managed__in=[ 444 "goauthentik.io/providers/oauth2/scope-openid", 445 "goauthentik.io/providers/oauth2/scope-email", 446 "goauthentik.io/providers/oauth2/scope-profile", 447 ] 448 ) 449 ) 450 provider.property_mappings.add( 451 ScopeMapping.objects.create( 452 name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}""" 453 ) 454 ) 455 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 456 state = generate_id() 457 user = create_test_admin_user() 458 self.client.force_login(user) 459 with patch( 460 "authentik.providers.oauth2.id_token.get_login_event", 461 MagicMock( 462 return_value=Event( 463 action=EventAction.LOGIN, 464 context={PLAN_CONTEXT_METHOD: "password"}, 465 created=now(), 466 ) 467 ), 468 ): 469 # Step 1, initiate params and get redirect to flow 470 response = self.client.get( 471 reverse("authentik_providers_oauth2:authorize"), 472 data={ 473 "response_type": "id_token", 474 "client_id": "test", 475 "state": state, 476 "scope": "openid test", 477 "redirect_uri": "http://localhost", 478 "nonce": generate_id(), 479 }, 480 ) 481 self.assertEqual(response.status_code, 302) 482 token = AccessToken.objects.filter(user=user).first() 483 expires = timedelta_from_string(provider.access_token_validity).total_seconds() 484 jwt = self.validate_jwe(token, provider) 485 self.assertEqual(jwt["amr"], ["pwd"]) 486 self.assertEqual(jwt["sub"], "foo") 487 self.assertAlmostEqual( 488 jwt["exp"] - now().timestamp(), 489 expires, 490 delta=5, 491 )
Test full authorization with encryption
493 def test_full_fragment_code(self): 494 """Test full authorization""" 495 flow = create_test_flow() 496 provider: OAuth2Provider = OAuth2Provider.objects.create( 497 name=generate_id(), 498 client_id="test", 499 authorization_flow=flow, 500 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 501 signing_key=self.keypair, 502 grant_types=[GrantType.AUTHORIZATION_CODE], 503 ) 504 Application.objects.create(name="app", slug="app", provider=provider) 505 state = generate_id() 506 user = create_test_admin_user() 507 self.client.force_login(user) 508 with patch( 509 "authentik.providers.oauth2.id_token.get_login_event", 510 MagicMock( 511 return_value=Event( 512 action=EventAction.LOGIN, 513 context={PLAN_CONTEXT_METHOD: "password"}, 514 created=now(), 515 ) 516 ), 517 ): 518 # Step 1, initiate params and get redirect to flow 519 response = self.client.get( 520 reverse("authentik_providers_oauth2:authorize"), 521 data={ 522 "response_type": "code", 523 "response_mode": "fragment", 524 "client_id": "test", 525 "state": state, 526 "scope": "openid", 527 "redirect_uri": "http://localhost", 528 "nonce": generate_id(), 529 }, 530 ) 531 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 532 self.assertEqual( 533 response.url, 534 f"http://localhost#code={code.code}&state={state}", 535 ) 536 self.assertAlmostEqual( 537 code.expires.timestamp() - now().timestamp(), 538 timedelta_from_string(provider.access_code_validity).total_seconds(), 539 delta=5, 540 )
Test full authorization
542 @apply_blueprint("system/providers-oauth2.yaml") 543 def test_full_form_post_id_token(self): 544 """Test full authorization (form_post response)""" 545 flow = create_test_flow() 546 provider = OAuth2Provider.objects.create( 547 name=generate_id(), 548 client_id=generate_id(), 549 authorization_flow=flow, 550 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 551 signing_key=self.keypair, 552 grant_types=[GrantType.IMPLICIT], 553 ) 554 provider.property_mappings.set( 555 ScopeMapping.objects.filter( 556 managed__in=[ 557 "goauthentik.io/providers/oauth2/scope-openid", 558 "goauthentik.io/providers/oauth2/scope-email", 559 "goauthentik.io/providers/oauth2/scope-profile", 560 ] 561 ) 562 ) 563 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 564 state = generate_id() 565 user = create_test_admin_user() 566 self.client.force_login(user) 567 # Step 1, initiate params and get redirect to flow 568 self.client.get( 569 reverse("authentik_providers_oauth2:authorize"), 570 data={ 571 "response_type": "id_token", 572 "response_mode": "form_post", 573 "client_id": provider.client_id, 574 "state": state, 575 "scope": "openid", 576 "redirect_uri": "http://localhost", 577 "nonce": generate_id(), 578 }, 579 ) 580 response = self.client.get( 581 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 582 ) 583 token = AccessToken.objects.filter(user=user).first() 584 self.assertIsNotNone(token) 585 self.assertJSONEqual( 586 response.content.decode(), 587 { 588 "component": "ak-stage-autosubmit", 589 "url": "http://localhost", 590 "title": f"Redirecting to {app.name}...", 591 "attrs": { 592 "id_token": provider.encode(token.id_token.to_dict()), 593 "token_type": TOKEN_TYPE, 594 "expires_in": "3600", 595 "state": state, 596 }, 597 }, 598 ) 599 self.validate_jwt(token, provider)
Test full authorization (form_post response)
601 def test_full_form_post_code(self): 602 """Test full authorization (form_post response, code type)""" 603 flow = create_test_flow() 604 provider = OAuth2Provider.objects.create( 605 name=generate_id(), 606 client_id=generate_id(), 607 authorization_flow=flow, 608 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 609 signing_key=self.keypair, 610 grant_types=[GrantType.AUTHORIZATION_CODE], 611 ) 612 app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 613 state = generate_id() 614 user = create_test_admin_user() 615 self.client.force_login(user) 616 # Step 1, initiate params and get redirect to flow 617 self.client.get( 618 reverse("authentik_providers_oauth2:authorize"), 619 data={ 620 "response_type": "code", 621 "response_mode": "form_post", 622 "client_id": provider.client_id, 623 "state": state, 624 "scope": "openid", 625 "redirect_uri": "http://localhost", 626 }, 627 ) 628 response = self.client.get( 629 reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), 630 ) 631 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 632 self.assertJSONEqual( 633 response.content.decode(), 634 { 635 "component": "ak-stage-autosubmit", 636 "url": "http://localhost", 637 "title": f"Redirecting to {app.name}...", 638 "attrs": { 639 "code": code.code, 640 "state": state, 641 }, 642 }, 643 )
Test full authorization (form_post response, code type)
645 def test_openid_missing_invalid(self): 646 """test request requiring an OpenID scope to be set""" 647 OAuth2Provider.objects.create( 648 name=generate_id(), 649 client_id="test", 650 authorization_flow=create_test_flow(), 651 grant_types=[GrantType.IMPLICIT], 652 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 653 ) 654 request = self.factory.get( 655 "/", 656 data={ 657 "response_type": "id_token", 658 "client_id": "test", 659 "redirect_uri": "http://localhost", 660 "scope": "", 661 }, 662 ) 663 with self.assertRaises(AuthorizeError) as cm: 664 OAuthAuthorizationParams.from_request(request) 665 self.assertEqual(cm.exception.cause, "scope_openid_missing")
test request requiring an OpenID scope to be set
667 @apply_blueprint("system/providers-oauth2.yaml") 668 def test_offline_access_invalid(self): 669 """test request for offline_access with invalid response type""" 670 provider = OAuth2Provider.objects.create( 671 name=generate_id(), 672 client_id="test", 673 authorization_flow=create_test_flow(), 674 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 675 grant_types=[GrantType.IMPLICIT], 676 ) 677 provider.property_mappings.set( 678 ScopeMapping.objects.filter( 679 managed__in=[ 680 "goauthentik.io/providers/oauth2/scope-openid", 681 "goauthentik.io/providers/oauth2/scope-offline_access", 682 ] 683 ) 684 ) 685 request = self.factory.get( 686 "/", 687 data={ 688 "response_type": "id_token", 689 "client_id": "test", 690 "redirect_uri": "http://localhost", 691 "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}", 692 "nonce": generate_id(), 693 }, 694 ) 695 parsed = OAuthAuthorizationParams.from_request(request) 696 self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope)
test request for offline_access with invalid response type
698 @apply_blueprint("default/flow-default-authentication-flow.yaml") 699 def test_ui_locales(self): 700 """Test OIDC ui_locales authorization""" 701 flow = create_test_flow() 702 provider = OAuth2Provider.objects.create( 703 name=generate_id(), 704 client_id="test", 705 authorization_flow=flow, 706 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 707 access_code_validity="seconds=100", 708 grant_types=[GrantType.AUTHORIZATION_CODE], 709 ) 710 Application.objects.create(name="app", slug="app", provider=provider) 711 state = generate_id() 712 self.client.logout() 713 try: 714 response = self.client.get( 715 reverse("authentik_providers_oauth2:authorize"), 716 data={ 717 "response_type": "code", 718 "client_id": "test", 719 "state": state, 720 "redirect_uri": "foo://localhost", 721 "ui_locales": "invalid fr", 722 }, 723 ) 724 parsed = parse_qs(urlparse(response.url).query) 725 self.assertEqual(parsed["locale"], ["fr"]) 726 finally: 727 translation.deactivate()
Test OIDC ui_locales authorization
729 @apply_blueprint("default/flow-default-authentication-flow.yaml") 730 def test_ui_locales_invalid(self): 731 """Test OIDC ui_locales authorization""" 732 flow = create_test_flow() 733 provider = OAuth2Provider.objects.create( 734 name=generate_id(), 735 client_id="test", 736 authorization_flow=flow, 737 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 738 access_code_validity="seconds=100", 739 grant_types=[GrantType.AUTHORIZATION_CODE], 740 ) 741 Application.objects.create(name="app", slug="app", provider=provider) 742 state = generate_id() 743 self.client.logout() 744 response = self.client.get( 745 reverse("authentik_providers_oauth2:authorize"), 746 data={ 747 "response_type": "code", 748 "client_id": "test", 749 "state": state, 750 "redirect_uri": "foo://localhost", 751 "ui_locales": "invalid", 752 }, 753 ) 754 parsed = parse_qs(urlparse(response.url).query) 755 self.assertNotIn("locale", parsed)
Test OIDC ui_locales authorization
757 def test_authentication_flow(self): 758 """Test custom authentication flow""" 759 brand = create_test_brand() 760 global_auth = create_test_flow() 761 FlowStageBinding.objects.create( 762 target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10 763 ) 764 brand.flow_authentication = global_auth 765 brand.save() 766 767 flow = create_test_flow() 768 auth_flow = create_test_flow() 769 FlowStageBinding.objects.create( 770 target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10 771 ) 772 provider = OAuth2Provider.objects.create( 773 name=generate_id(), 774 client_id="test", 775 authorization_flow=flow, 776 authentication_flow=auth_flow, 777 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 778 access_code_validity="seconds=100", 779 grant_types=[GrantType.AUTHORIZATION_CODE], 780 ) 781 Application.objects.create(name="app", slug="app", provider=provider) 782 state = generate_id() 783 response = self.client.get( 784 reverse("authentik_providers_oauth2:authorize"), 785 data={ 786 "response_type": "code", 787 "client_id": "test", 788 "state": state, 789 "redirect_uri": "foo://localhost", 790 }, 791 ) 792 self.assertEqual(response.status_code, 302) 793 self.assertIn(auth_flow.slug, response.url) 794 self.assertNotIn(global_auth.slug, response.url)
Test custom authentication flow
796 @apply_blueprint("default/flow-default-authentication-flow.yaml") 797 def test_login_hint(self): 798 """Login hint""" 799 flow = create_test_flow() 800 provider = OAuth2Provider.objects.create( 801 name=generate_id(), 802 client_id="test", 803 authorization_flow=flow, 804 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 805 access_code_validity="seconds=100", 806 grant_types=[GrantType.AUTHORIZATION_CODE], 807 ) 808 Application.objects.create(name="app", slug="app", provider=provider) 809 state = generate_id() 810 response = self.client.get( 811 reverse("authentik_providers_oauth2:authorize"), 812 data={ 813 "response_type": "code", 814 "client_id": "test", 815 "state": state, 816 "redirect_uri": "foo://localhost", 817 "login_hint": "foo", 818 }, 819 ) 820 self.assertEqual(response.status_code, 302) 821 plan = self.client.session.get(SESSION_KEY_PLAN) 822 self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")
Login hint