authentik.providers.oauth2.tests.test_token
Test token view
1"""Test token view""" 2 3from base64 import b64encode 4from datetime import timedelta 5from json import dumps 6from urllib.parse import quote 7 8from django.test import RequestFactory 9from django.urls import reverse 10from django.utils import timezone 11from django.utils.timezone import now 12from freezegun import freeze_time 13 14from authentik.blueprints.tests import apply_blueprint 15from authentik.common.oauth.constants import ( 16 GRANT_TYPE_AUTHORIZATION_CODE, 17 GRANT_TYPE_REFRESH_TOKEN, 18 TOKEN_TYPE, 19) 20from authentik.core.models import Application 21from authentik.core.tests.utils import create_test_admin_user, create_test_flow 22from authentik.events.models import Event, EventAction 23from authentik.lib.generators import generate_id, generate_key 24from authentik.providers.oauth2.errors import TokenError 25from authentik.providers.oauth2.models import ( 26 AccessToken, 27 AuthorizationCode, 28 GrantType, 29 OAuth2Provider, 30 RedirectURI, 31 RedirectURIMatchingMode, 32 RefreshToken, 33 ScopeMapping, 34) 35from authentik.providers.oauth2.tests.utils import OAuthTestCase 36from authentik.providers.oauth2.utils import extract_client_auth 37from authentik.providers.oauth2.views.token import TokenParams 38 39 40class TestToken(OAuthTestCase): 41 """Test token view""" 42 43 def setUp(self) -> None: 44 super().setUp() 45 self.factory = RequestFactory() 46 self.app = Application.objects.create(name=generate_id(), slug="test") 47 48 def test_invalid_grant_type(self): 49 """test request param""" 50 provider = OAuth2Provider.objects.create( 51 name=generate_id(), 52 authorization_flow=create_test_flow(), 53 grant_types=[], 54 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")], 55 signing_key=self.keypair, 56 ) 57 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 58 user = create_test_admin_user() 59 code = AuthorizationCode.objects.create( 60 code="foobar", provider=provider, user=user, auth_time=timezone.now() 61 ) 62 request = self.factory.post( 63 "/", 64 data={ 65 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 66 "code": code.code, 67 "redirect_uri": "http://TestServer", 68 }, 69 HTTP_AUTHORIZATION=f"Basic {header}", 70 ) 71 with self.assertRaises(TokenError) as cm: 72 TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 73 self.assertEqual(cm.exception.cause, "grant_type_not_configured") 74 75 def test_request_auth_code(self): 76 """test request param""" 77 provider = OAuth2Provider.objects.create( 78 name=generate_id(), 79 authorization_flow=create_test_flow(), 80 grant_types=[GrantType.AUTHORIZATION_CODE], 81 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")], 82 signing_key=self.keypair, 83 ) 84 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 85 user = create_test_admin_user() 86 code = AuthorizationCode.objects.create( 87 code="foobar", provider=provider, user=user, auth_time=timezone.now() 88 ) 89 request = self.factory.post( 90 "/", 91 data={ 92 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 93 "code": code.code, 94 "redirect_uri": "http://TestServer", 95 }, 96 HTTP_AUTHORIZATION=f"Basic {header}", 97 ) 98 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 99 self.assertEqual(params.provider, provider) 100 with self.assertRaises(TokenError): 101 TokenParams.parse(request, provider, provider.client_id, generate_key()) 102 103 def test_request_auth_code_invalid(self): 104 """test request param""" 105 provider = OAuth2Provider.objects.create( 106 name=generate_id(), 107 authorization_flow=create_test_flow(), 108 grant_types=[GrantType.REFRESH_TOKEN], 109 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 110 signing_key=self.keypair, 111 ) 112 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 113 request = self.factory.post( 114 "/", 115 data={ 116 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 117 "code": "foo", 118 "redirect_uri": "http://testserver", 119 }, 120 HTTP_AUTHORIZATION=f"Basic {header}", 121 ) 122 with self.assertRaises(TokenError): 123 TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 124 125 def test_request_refresh_token(self): 126 """test request param""" 127 provider = OAuth2Provider.objects.create( 128 name=generate_id(), 129 authorization_flow=create_test_flow(), 130 grant_types=[GrantType.REFRESH_TOKEN], 131 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 132 signing_key=self.keypair, 133 ) 134 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 135 user = create_test_admin_user() 136 token = RefreshToken.objects.create( 137 provider=provider, 138 user=user, 139 token=generate_id(), 140 auth_time=timezone.now(), 141 ) 142 request = self.factory.post( 143 "/", 144 data={ 145 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 146 "refresh_token": token.token, 147 "redirect_uri": "http://local.invalid", 148 }, 149 HTTP_AUTHORIZATION=f"Basic {header}", 150 ) 151 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 152 self.assertEqual(params.provider, provider) 153 154 def test_extract_client_auth_basic_auth_percent_decodes(self): 155 """test percent-decoding of client credentials in Basic auth""" 156 header = b64encode( 157 f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode() 158 ).decode() 159 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 160 self.assertEqual(extract_client_auth(request), ("client/id", "secret+/==")) 161 162 def test_extract_client_auth_basic_auth_preserves_raw_plus(self): 163 """test compatibility with clients that still send raw plus characters""" 164 header = b64encode(b"client:secret+plus").decode() 165 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 166 self.assertEqual(extract_client_auth(request), ("client", "secret+plus")) 167 168 def test_auth_code_view(self): 169 """test request param""" 170 provider = OAuth2Provider.objects.create( 171 name=generate_id(), 172 authorization_flow=create_test_flow(), 173 grant_types=[GrantType.AUTHORIZATION_CODE], 174 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 175 signing_key=self.keypair, 176 ) 177 # Needs to be assigned to an application for iss to be set 178 self.app.provider = provider 179 self.app.save() 180 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 181 user = create_test_admin_user() 182 code = AuthorizationCode.objects.create( 183 code="foobar", provider=provider, user=user, auth_time=timezone.now() 184 ) 185 response = self.client.post( 186 reverse("authentik_providers_oauth2:token"), 187 data={ 188 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 189 "code": code.code, 190 "redirect_uri": "http://local.invalid", 191 }, 192 HTTP_AUTHORIZATION=f"Basic {header}", 193 ) 194 access = AccessToken.objects.filter(user=user, provider=provider).first() 195 self.assertJSONEqual( 196 response.content.decode(), 197 { 198 "access_token": access.token, 199 "token_type": TOKEN_TYPE, 200 "expires_in": 3600, 201 "id_token": provider.encode( 202 access.id_token.to_dict(), 203 ), 204 "scope": "", 205 }, 206 ) 207 self.validate_jwt(access, provider) 208 209 def test_auth_code_enc(self): 210 """test request param""" 211 provider = OAuth2Provider.objects.create( 212 name=generate_id(), 213 authorization_flow=create_test_flow(), 214 grant_types=[GrantType.AUTHORIZATION_CODE], 215 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 216 signing_key=self.keypair, 217 encryption_key=self.keypair, 218 ) 219 # Needs to be assigned to an application for iss to be set 220 self.app.provider = provider 221 self.app.save() 222 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 223 user = create_test_admin_user() 224 code = AuthorizationCode.objects.create( 225 code="foobar", provider=provider, user=user, auth_time=timezone.now() 226 ) 227 response = self.client.post( 228 reverse("authentik_providers_oauth2:token"), 229 data={ 230 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 231 "code": code.code, 232 "redirect_uri": "http://local.invalid", 233 }, 234 HTTP_AUTHORIZATION=f"Basic {header}", 235 ) 236 self.assertEqual(response.status_code, 200) 237 access = AccessToken.objects.filter(user=user, provider=provider).first() 238 self.validate_jwe(access, provider) 239 240 @apply_blueprint("system/providers-oauth2.yaml") 241 def test_refresh_token_view(self): 242 """test request param""" 243 provider = OAuth2Provider.objects.create( 244 name=generate_id(), 245 authorization_flow=create_test_flow(), 246 grant_types=[GrantType.REFRESH_TOKEN], 247 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 248 signing_key=self.keypair, 249 ) 250 provider.property_mappings.set( 251 ScopeMapping.objects.filter( 252 managed__in=[ 253 "goauthentik.io/providers/oauth2/scope-openid", 254 "goauthentik.io/providers/oauth2/scope-email", 255 "goauthentik.io/providers/oauth2/scope-profile", 256 "goauthentik.io/providers/oauth2/scope-offline_access", 257 ] 258 ) 259 ) 260 # Needs to be assigned to an application for iss to be set 261 self.app.provider = provider 262 self.app.save() 263 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 264 user = create_test_admin_user() 265 token = RefreshToken.objects.create( 266 provider=provider, 267 user=user, 268 token=generate_id(), 269 _id_token=dumps({}), 270 auth_time=timezone.now(), 271 _scope="offline_access", 272 ) 273 response = self.client.post( 274 reverse("authentik_providers_oauth2:token"), 275 data={ 276 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 277 "refresh_token": token.token, 278 "redirect_uri": "http://local.invalid", 279 }, 280 HTTP_AUTHORIZATION=f"Basic {header}", 281 HTTP_ORIGIN="http://local.invalid", 282 ) 283 self.assertEqual(response["Access-Control-Allow-Credentials"], "true") 284 self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid") 285 access = AccessToken.objects.filter(user=user, provider=provider).first() 286 refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first() 287 self.assertJSONEqual( 288 response.content.decode(), 289 { 290 "access_token": access.token, 291 "refresh_token": refresh.token, 292 "token_type": TOKEN_TYPE, 293 "expires_in": 3600, 294 "id_token": provider.encode( 295 access.id_token.to_dict(), 296 ), 297 "scope": "offline_access", 298 }, 299 ) 300 self.validate_jwt(access, provider) 301 302 @apply_blueprint("system/providers-oauth2.yaml") 303 def test_refresh_token_view_invalid_origin(self): 304 """test request param""" 305 provider = OAuth2Provider.objects.create( 306 name=generate_id(), 307 authorization_flow=create_test_flow(), 308 grant_types=[GrantType.REFRESH_TOKEN], 309 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 310 signing_key=self.keypair, 311 ) 312 provider.property_mappings.set( 313 ScopeMapping.objects.filter( 314 managed__in=[ 315 "goauthentik.io/providers/oauth2/scope-openid", 316 "goauthentik.io/providers/oauth2/scope-email", 317 "goauthentik.io/providers/oauth2/scope-profile", 318 "goauthentik.io/providers/oauth2/scope-offline_access", 319 ] 320 ) 321 ) 322 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 323 user = create_test_admin_user() 324 token = RefreshToken.objects.create( 325 provider=provider, 326 user=user, 327 token=generate_id(), 328 _id_token=dumps({}), 329 auth_time=timezone.now(), 330 _scope="offline_access", 331 ) 332 response = self.client.post( 333 reverse("authentik_providers_oauth2:token"), 334 data={ 335 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 336 "refresh_token": token.token, 337 "redirect_uri": "http://local.invalid", 338 }, 339 HTTP_AUTHORIZATION=f"Basic {header}", 340 HTTP_ORIGIN="http://another.invalid", 341 ) 342 access = AccessToken.objects.filter(user=user, provider=provider).first() 343 refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first() 344 self.assertNotIn("Access-Control-Allow-Credentials", response) 345 self.assertNotIn("Access-Control-Allow-Origin", response) 346 self.assertJSONEqual( 347 response.content.decode(), 348 { 349 "access_token": access.token, 350 "refresh_token": refresh.token, 351 "token_type": TOKEN_TYPE, 352 "expires_in": 3600, 353 "id_token": provider.encode( 354 access.id_token.to_dict(), 355 ), 356 "scope": "offline_access", 357 }, 358 ) 359 360 @apply_blueprint("system/providers-oauth2.yaml") 361 def test_refresh_token_revoke(self): 362 """test request param""" 363 provider = OAuth2Provider.objects.create( 364 name=generate_id(), 365 authorization_flow=create_test_flow(), 366 grant_types=[GrantType.REFRESH_TOKEN], 367 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 368 signing_key=self.keypair, 369 ) 370 provider.property_mappings.set( 371 ScopeMapping.objects.filter( 372 managed__in=[ 373 "goauthentik.io/providers/oauth2/scope-openid", 374 "goauthentik.io/providers/oauth2/scope-email", 375 "goauthentik.io/providers/oauth2/scope-profile", 376 "goauthentik.io/providers/oauth2/scope-offline_access", 377 ] 378 ) 379 ) 380 # Needs to be assigned to an application for iss to be set 381 self.app.provider = provider 382 self.app.save() 383 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 384 user = create_test_admin_user() 385 token = RefreshToken.objects.create( 386 provider=provider, 387 user=user, 388 token=generate_id(), 389 _id_token=dumps({}), 390 auth_time=timezone.now(), 391 _scope="offline_access", 392 ) 393 # Create initial refresh token 394 response = self.client.post( 395 reverse("authentik_providers_oauth2:token"), 396 data={ 397 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 398 "refresh_token": token.token, 399 "redirect_uri": "http://testserver", 400 }, 401 HTTP_AUTHORIZATION=f"Basic {header}", 402 ) 403 new_token = RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first() 404 # Post again with initial token -> get new refresh token 405 # and revoke old one 406 response = self.client.post( 407 reverse("authentik_providers_oauth2:token"), 408 data={ 409 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 410 "refresh_token": new_token.token, 411 "redirect_uri": "http://local.invalid", 412 }, 413 HTTP_AUTHORIZATION=f"Basic {header}", 414 ) 415 self.assertEqual(response.status_code, 200) 416 # Post again with old token, is now revoked and should error 417 response = self.client.post( 418 reverse("authentik_providers_oauth2:token"), 419 data={ 420 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 421 "refresh_token": new_token.token, 422 "redirect_uri": "http://local.invalid", 423 }, 424 HTTP_AUTHORIZATION=f"Basic {header}", 425 ) 426 self.assertEqual(response.status_code, 400) 427 self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists()) 428 429 @apply_blueprint("system/providers-oauth2.yaml") 430 def test_refresh_token_view_threshold(self): 431 """refresh token threshold 432 433 threshold set to 1 hour, refresh token expires in 2 hours. 434 First request should not return a new refresh token, second request 435 has a fake time 1 hours in the future which should return a new access token""" 436 provider = OAuth2Provider.objects.create( 437 name=generate_id(), 438 authorization_flow=create_test_flow(), 439 grant_types=[GrantType.REFRESH_TOKEN], 440 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 441 signing_key=self.keypair, 442 refresh_token_threshold="hours=1", # nosec 443 ) 444 provider.property_mappings.set( 445 ScopeMapping.objects.filter( 446 managed__in=[ 447 "goauthentik.io/providers/oauth2/scope-openid", 448 "goauthentik.io/providers/oauth2/scope-email", 449 "goauthentik.io/providers/oauth2/scope-profile", 450 "goauthentik.io/providers/oauth2/scope-offline_access", 451 ] 452 ) 453 ) 454 # Needs to be assigned to an application for iss to be set 455 self.app.provider = provider 456 self.app.save() 457 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 458 user = create_test_admin_user() 459 token = RefreshToken.objects.create( 460 provider=provider, 461 user=user, 462 token=generate_id(), 463 _id_token=dumps({}), 464 auth_time=timezone.now(), 465 _scope="offline_access", 466 expires=now() + timedelta(hours=2), 467 ) 468 response = self.client.post( 469 reverse("authentik_providers_oauth2:token"), 470 data={ 471 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 472 "refresh_token": token.token, 473 "redirect_uri": "http://local.invalid", 474 }, 475 HTTP_AUTHORIZATION=f"Basic {header}", 476 HTTP_ORIGIN="http://local.invalid", 477 ) 478 access = AccessToken.objects.filter(user=user, provider=provider).first() 479 self.assertJSONEqual( 480 response.content.decode(), 481 { 482 "access_token": access.token, 483 "token_type": TOKEN_TYPE, 484 "expires_in": 3600, 485 "id_token": provider.encode( 486 access.id_token.to_dict(), 487 ), 488 "scope": "offline_access", 489 }, 490 ) 491 self.validate_jwt(access, provider) 492 493 with freeze_time(now() + timedelta(hours=1, minutes=10)): 494 response = self.client.post( 495 reverse("authentik_providers_oauth2:token"), 496 data={ 497 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 498 "refresh_token": token.token, 499 "redirect_uri": "http://local.invalid", 500 }, 501 HTTP_AUTHORIZATION=f"Basic {header}", 502 HTTP_ORIGIN="http://local.invalid", 503 ) 504 access = AccessToken.objects.filter(user=user, provider=provider).first() 505 refresh = RefreshToken.objects.filter(user=user, provider=provider).last() 506 self.assertJSONEqual( 507 response.content.decode(), 508 { 509 "access_token": access.token, 510 "token_type": TOKEN_TYPE, 511 "expires_in": 3600, 512 "id_token": provider.encode( 513 access.id_token.to_dict(), 514 ), 515 "scope": "offline_access", 516 "refresh_token": refresh.token, 517 }, 518 ) 519 self.validate_jwt(access, provider) 520 521 @apply_blueprint("system/providers-oauth2.yaml") 522 def test_scope_claim_override_via_property_mapping(self): 523 """Test that property mappings can override the scope claim in access tokens. 524 525 See: https://github.com/goauthentik/authentik/issues/19224 526 """ 527 # Create a custom scope mapping that returns a custom scope claim 528 custom_scope_mapping = ScopeMapping.objects.create( 529 name="custom-scope-override", 530 scope_name="custom", 531 expression='return {"scope": "custom-scope-value additional-scope"}', 532 ) 533 534 provider = OAuth2Provider.objects.create( 535 name=generate_id(), 536 authorization_flow=create_test_flow(), 537 grant_types=[GrantType.AUTHORIZATION_CODE], 538 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 539 signing_key=self.keypair, 540 include_claims_in_id_token=True, 541 ) 542 provider.property_mappings.add(custom_scope_mapping) 543 544 # Needs to be assigned to an application for iss to be set 545 self.app.provider = provider 546 self.app.save() 547 548 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 549 user = create_test_admin_user() 550 code = AuthorizationCode.objects.create( 551 code="foobar", 552 provider=provider, 553 user=user, 554 auth_time=timezone.now(), 555 _scope="openid custom", # Request the custom scope 556 ) 557 558 response = self.client.post( 559 reverse("authentik_providers_oauth2:token"), 560 data={ 561 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 562 "code": code.code, 563 "redirect_uri": "http://local.invalid", 564 }, 565 HTTP_AUTHORIZATION=f"Basic {header}", 566 ) 567 self.assertEqual(response.status_code, 200) 568 569 access = AccessToken.objects.filter(user=user, provider=provider).first() 570 jwt_data = self.validate_jwt(access, provider) 571 572 # The scope should be the custom value from the property mapping, 573 # not the default "openid custom" 574 self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")
41class TestToken(OAuthTestCase): 42 """Test token view""" 43 44 def setUp(self) -> None: 45 super().setUp() 46 self.factory = RequestFactory() 47 self.app = Application.objects.create(name=generate_id(), slug="test") 48 49 def test_invalid_grant_type(self): 50 """test request param""" 51 provider = OAuth2Provider.objects.create( 52 name=generate_id(), 53 authorization_flow=create_test_flow(), 54 grant_types=[], 55 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")], 56 signing_key=self.keypair, 57 ) 58 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 59 user = create_test_admin_user() 60 code = AuthorizationCode.objects.create( 61 code="foobar", provider=provider, user=user, auth_time=timezone.now() 62 ) 63 request = self.factory.post( 64 "/", 65 data={ 66 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 67 "code": code.code, 68 "redirect_uri": "http://TestServer", 69 }, 70 HTTP_AUTHORIZATION=f"Basic {header}", 71 ) 72 with self.assertRaises(TokenError) as cm: 73 TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 74 self.assertEqual(cm.exception.cause, "grant_type_not_configured") 75 76 def test_request_auth_code(self): 77 """test request param""" 78 provider = OAuth2Provider.objects.create( 79 name=generate_id(), 80 authorization_flow=create_test_flow(), 81 grant_types=[GrantType.AUTHORIZATION_CODE], 82 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")], 83 signing_key=self.keypair, 84 ) 85 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 86 user = create_test_admin_user() 87 code = AuthorizationCode.objects.create( 88 code="foobar", provider=provider, user=user, auth_time=timezone.now() 89 ) 90 request = self.factory.post( 91 "/", 92 data={ 93 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 94 "code": code.code, 95 "redirect_uri": "http://TestServer", 96 }, 97 HTTP_AUTHORIZATION=f"Basic {header}", 98 ) 99 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 100 self.assertEqual(params.provider, provider) 101 with self.assertRaises(TokenError): 102 TokenParams.parse(request, provider, provider.client_id, generate_key()) 103 104 def test_request_auth_code_invalid(self): 105 """test request param""" 106 provider = OAuth2Provider.objects.create( 107 name=generate_id(), 108 authorization_flow=create_test_flow(), 109 grant_types=[GrantType.REFRESH_TOKEN], 110 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 111 signing_key=self.keypair, 112 ) 113 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 114 request = self.factory.post( 115 "/", 116 data={ 117 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 118 "code": "foo", 119 "redirect_uri": "http://testserver", 120 }, 121 HTTP_AUTHORIZATION=f"Basic {header}", 122 ) 123 with self.assertRaises(TokenError): 124 TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 125 126 def test_request_refresh_token(self): 127 """test request param""" 128 provider = OAuth2Provider.objects.create( 129 name=generate_id(), 130 authorization_flow=create_test_flow(), 131 grant_types=[GrantType.REFRESH_TOKEN], 132 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 133 signing_key=self.keypair, 134 ) 135 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 136 user = create_test_admin_user() 137 token = RefreshToken.objects.create( 138 provider=provider, 139 user=user, 140 token=generate_id(), 141 auth_time=timezone.now(), 142 ) 143 request = self.factory.post( 144 "/", 145 data={ 146 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 147 "refresh_token": token.token, 148 "redirect_uri": "http://local.invalid", 149 }, 150 HTTP_AUTHORIZATION=f"Basic {header}", 151 ) 152 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 153 self.assertEqual(params.provider, provider) 154 155 def test_extract_client_auth_basic_auth_percent_decodes(self): 156 """test percent-decoding of client credentials in Basic auth""" 157 header = b64encode( 158 f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode() 159 ).decode() 160 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 161 self.assertEqual(extract_client_auth(request), ("client/id", "secret+/==")) 162 163 def test_extract_client_auth_basic_auth_preserves_raw_plus(self): 164 """test compatibility with clients that still send raw plus characters""" 165 header = b64encode(b"client:secret+plus").decode() 166 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 167 self.assertEqual(extract_client_auth(request), ("client", "secret+plus")) 168 169 def test_auth_code_view(self): 170 """test request param""" 171 provider = OAuth2Provider.objects.create( 172 name=generate_id(), 173 authorization_flow=create_test_flow(), 174 grant_types=[GrantType.AUTHORIZATION_CODE], 175 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 176 signing_key=self.keypair, 177 ) 178 # Needs to be assigned to an application for iss to be set 179 self.app.provider = provider 180 self.app.save() 181 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 182 user = create_test_admin_user() 183 code = AuthorizationCode.objects.create( 184 code="foobar", provider=provider, user=user, auth_time=timezone.now() 185 ) 186 response = self.client.post( 187 reverse("authentik_providers_oauth2:token"), 188 data={ 189 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 190 "code": code.code, 191 "redirect_uri": "http://local.invalid", 192 }, 193 HTTP_AUTHORIZATION=f"Basic {header}", 194 ) 195 access = AccessToken.objects.filter(user=user, provider=provider).first() 196 self.assertJSONEqual( 197 response.content.decode(), 198 { 199 "access_token": access.token, 200 "token_type": TOKEN_TYPE, 201 "expires_in": 3600, 202 "id_token": provider.encode( 203 access.id_token.to_dict(), 204 ), 205 "scope": "", 206 }, 207 ) 208 self.validate_jwt(access, provider) 209 210 def test_auth_code_enc(self): 211 """test request param""" 212 provider = OAuth2Provider.objects.create( 213 name=generate_id(), 214 authorization_flow=create_test_flow(), 215 grant_types=[GrantType.AUTHORIZATION_CODE], 216 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 217 signing_key=self.keypair, 218 encryption_key=self.keypair, 219 ) 220 # Needs to be assigned to an application for iss to be set 221 self.app.provider = provider 222 self.app.save() 223 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 224 user = create_test_admin_user() 225 code = AuthorizationCode.objects.create( 226 code="foobar", provider=provider, user=user, auth_time=timezone.now() 227 ) 228 response = self.client.post( 229 reverse("authentik_providers_oauth2:token"), 230 data={ 231 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 232 "code": code.code, 233 "redirect_uri": "http://local.invalid", 234 }, 235 HTTP_AUTHORIZATION=f"Basic {header}", 236 ) 237 self.assertEqual(response.status_code, 200) 238 access = AccessToken.objects.filter(user=user, provider=provider).first() 239 self.validate_jwe(access, provider) 240 241 @apply_blueprint("system/providers-oauth2.yaml") 242 def test_refresh_token_view(self): 243 """test request param""" 244 provider = OAuth2Provider.objects.create( 245 name=generate_id(), 246 authorization_flow=create_test_flow(), 247 grant_types=[GrantType.REFRESH_TOKEN], 248 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 249 signing_key=self.keypair, 250 ) 251 provider.property_mappings.set( 252 ScopeMapping.objects.filter( 253 managed__in=[ 254 "goauthentik.io/providers/oauth2/scope-openid", 255 "goauthentik.io/providers/oauth2/scope-email", 256 "goauthentik.io/providers/oauth2/scope-profile", 257 "goauthentik.io/providers/oauth2/scope-offline_access", 258 ] 259 ) 260 ) 261 # Needs to be assigned to an application for iss to be set 262 self.app.provider = provider 263 self.app.save() 264 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 265 user = create_test_admin_user() 266 token = RefreshToken.objects.create( 267 provider=provider, 268 user=user, 269 token=generate_id(), 270 _id_token=dumps({}), 271 auth_time=timezone.now(), 272 _scope="offline_access", 273 ) 274 response = self.client.post( 275 reverse("authentik_providers_oauth2:token"), 276 data={ 277 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 278 "refresh_token": token.token, 279 "redirect_uri": "http://local.invalid", 280 }, 281 HTTP_AUTHORIZATION=f"Basic {header}", 282 HTTP_ORIGIN="http://local.invalid", 283 ) 284 self.assertEqual(response["Access-Control-Allow-Credentials"], "true") 285 self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid") 286 access = AccessToken.objects.filter(user=user, provider=provider).first() 287 refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first() 288 self.assertJSONEqual( 289 response.content.decode(), 290 { 291 "access_token": access.token, 292 "refresh_token": refresh.token, 293 "token_type": TOKEN_TYPE, 294 "expires_in": 3600, 295 "id_token": provider.encode( 296 access.id_token.to_dict(), 297 ), 298 "scope": "offline_access", 299 }, 300 ) 301 self.validate_jwt(access, provider) 302 303 @apply_blueprint("system/providers-oauth2.yaml") 304 def test_refresh_token_view_invalid_origin(self): 305 """test request param""" 306 provider = OAuth2Provider.objects.create( 307 name=generate_id(), 308 authorization_flow=create_test_flow(), 309 grant_types=[GrantType.REFRESH_TOKEN], 310 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 311 signing_key=self.keypair, 312 ) 313 provider.property_mappings.set( 314 ScopeMapping.objects.filter( 315 managed__in=[ 316 "goauthentik.io/providers/oauth2/scope-openid", 317 "goauthentik.io/providers/oauth2/scope-email", 318 "goauthentik.io/providers/oauth2/scope-profile", 319 "goauthentik.io/providers/oauth2/scope-offline_access", 320 ] 321 ) 322 ) 323 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 324 user = create_test_admin_user() 325 token = RefreshToken.objects.create( 326 provider=provider, 327 user=user, 328 token=generate_id(), 329 _id_token=dumps({}), 330 auth_time=timezone.now(), 331 _scope="offline_access", 332 ) 333 response = self.client.post( 334 reverse("authentik_providers_oauth2:token"), 335 data={ 336 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 337 "refresh_token": token.token, 338 "redirect_uri": "http://local.invalid", 339 }, 340 HTTP_AUTHORIZATION=f"Basic {header}", 341 HTTP_ORIGIN="http://another.invalid", 342 ) 343 access = AccessToken.objects.filter(user=user, provider=provider).first() 344 refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first() 345 self.assertNotIn("Access-Control-Allow-Credentials", response) 346 self.assertNotIn("Access-Control-Allow-Origin", response) 347 self.assertJSONEqual( 348 response.content.decode(), 349 { 350 "access_token": access.token, 351 "refresh_token": refresh.token, 352 "token_type": TOKEN_TYPE, 353 "expires_in": 3600, 354 "id_token": provider.encode( 355 access.id_token.to_dict(), 356 ), 357 "scope": "offline_access", 358 }, 359 ) 360 361 @apply_blueprint("system/providers-oauth2.yaml") 362 def test_refresh_token_revoke(self): 363 """test request param""" 364 provider = OAuth2Provider.objects.create( 365 name=generate_id(), 366 authorization_flow=create_test_flow(), 367 grant_types=[GrantType.REFRESH_TOKEN], 368 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 369 signing_key=self.keypair, 370 ) 371 provider.property_mappings.set( 372 ScopeMapping.objects.filter( 373 managed__in=[ 374 "goauthentik.io/providers/oauth2/scope-openid", 375 "goauthentik.io/providers/oauth2/scope-email", 376 "goauthentik.io/providers/oauth2/scope-profile", 377 "goauthentik.io/providers/oauth2/scope-offline_access", 378 ] 379 ) 380 ) 381 # Needs to be assigned to an application for iss to be set 382 self.app.provider = provider 383 self.app.save() 384 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 385 user = create_test_admin_user() 386 token = RefreshToken.objects.create( 387 provider=provider, 388 user=user, 389 token=generate_id(), 390 _id_token=dumps({}), 391 auth_time=timezone.now(), 392 _scope="offline_access", 393 ) 394 # Create initial refresh token 395 response = self.client.post( 396 reverse("authentik_providers_oauth2:token"), 397 data={ 398 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 399 "refresh_token": token.token, 400 "redirect_uri": "http://testserver", 401 }, 402 HTTP_AUTHORIZATION=f"Basic {header}", 403 ) 404 new_token = RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first() 405 # Post again with initial token -> get new refresh token 406 # and revoke old one 407 response = self.client.post( 408 reverse("authentik_providers_oauth2:token"), 409 data={ 410 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 411 "refresh_token": new_token.token, 412 "redirect_uri": "http://local.invalid", 413 }, 414 HTTP_AUTHORIZATION=f"Basic {header}", 415 ) 416 self.assertEqual(response.status_code, 200) 417 # Post again with old token, is now revoked and should error 418 response = self.client.post( 419 reverse("authentik_providers_oauth2:token"), 420 data={ 421 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 422 "refresh_token": new_token.token, 423 "redirect_uri": "http://local.invalid", 424 }, 425 HTTP_AUTHORIZATION=f"Basic {header}", 426 ) 427 self.assertEqual(response.status_code, 400) 428 self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists()) 429 430 @apply_blueprint("system/providers-oauth2.yaml") 431 def test_refresh_token_view_threshold(self): 432 """refresh token threshold 433 434 threshold set to 1 hour, refresh token expires in 2 hours. 435 First request should not return a new refresh token, second request 436 has a fake time 1 hours in the future which should return a new access token""" 437 provider = OAuth2Provider.objects.create( 438 name=generate_id(), 439 authorization_flow=create_test_flow(), 440 grant_types=[GrantType.REFRESH_TOKEN], 441 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 442 signing_key=self.keypair, 443 refresh_token_threshold="hours=1", # nosec 444 ) 445 provider.property_mappings.set( 446 ScopeMapping.objects.filter( 447 managed__in=[ 448 "goauthentik.io/providers/oauth2/scope-openid", 449 "goauthentik.io/providers/oauth2/scope-email", 450 "goauthentik.io/providers/oauth2/scope-profile", 451 "goauthentik.io/providers/oauth2/scope-offline_access", 452 ] 453 ) 454 ) 455 # Needs to be assigned to an application for iss to be set 456 self.app.provider = provider 457 self.app.save() 458 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 459 user = create_test_admin_user() 460 token = RefreshToken.objects.create( 461 provider=provider, 462 user=user, 463 token=generate_id(), 464 _id_token=dumps({}), 465 auth_time=timezone.now(), 466 _scope="offline_access", 467 expires=now() + timedelta(hours=2), 468 ) 469 response = self.client.post( 470 reverse("authentik_providers_oauth2:token"), 471 data={ 472 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 473 "refresh_token": token.token, 474 "redirect_uri": "http://local.invalid", 475 }, 476 HTTP_AUTHORIZATION=f"Basic {header}", 477 HTTP_ORIGIN="http://local.invalid", 478 ) 479 access = AccessToken.objects.filter(user=user, provider=provider).first() 480 self.assertJSONEqual( 481 response.content.decode(), 482 { 483 "access_token": access.token, 484 "token_type": TOKEN_TYPE, 485 "expires_in": 3600, 486 "id_token": provider.encode( 487 access.id_token.to_dict(), 488 ), 489 "scope": "offline_access", 490 }, 491 ) 492 self.validate_jwt(access, provider) 493 494 with freeze_time(now() + timedelta(hours=1, minutes=10)): 495 response = self.client.post( 496 reverse("authentik_providers_oauth2:token"), 497 data={ 498 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 499 "refresh_token": token.token, 500 "redirect_uri": "http://local.invalid", 501 }, 502 HTTP_AUTHORIZATION=f"Basic {header}", 503 HTTP_ORIGIN="http://local.invalid", 504 ) 505 access = AccessToken.objects.filter(user=user, provider=provider).first() 506 refresh = RefreshToken.objects.filter(user=user, provider=provider).last() 507 self.assertJSONEqual( 508 response.content.decode(), 509 { 510 "access_token": access.token, 511 "token_type": TOKEN_TYPE, 512 "expires_in": 3600, 513 "id_token": provider.encode( 514 access.id_token.to_dict(), 515 ), 516 "scope": "offline_access", 517 "refresh_token": refresh.token, 518 }, 519 ) 520 self.validate_jwt(access, provider) 521 522 @apply_blueprint("system/providers-oauth2.yaml") 523 def test_scope_claim_override_via_property_mapping(self): 524 """Test that property mappings can override the scope claim in access tokens. 525 526 See: https://github.com/goauthentik/authentik/issues/19224 527 """ 528 # Create a custom scope mapping that returns a custom scope claim 529 custom_scope_mapping = ScopeMapping.objects.create( 530 name="custom-scope-override", 531 scope_name="custom", 532 expression='return {"scope": "custom-scope-value additional-scope"}', 533 ) 534 535 provider = OAuth2Provider.objects.create( 536 name=generate_id(), 537 authorization_flow=create_test_flow(), 538 grant_types=[GrantType.AUTHORIZATION_CODE], 539 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 540 signing_key=self.keypair, 541 include_claims_in_id_token=True, 542 ) 543 provider.property_mappings.add(custom_scope_mapping) 544 545 # Needs to be assigned to an application for iss to be set 546 self.app.provider = provider 547 self.app.save() 548 549 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 550 user = create_test_admin_user() 551 code = AuthorizationCode.objects.create( 552 code="foobar", 553 provider=provider, 554 user=user, 555 auth_time=timezone.now(), 556 _scope="openid custom", # Request the custom scope 557 ) 558 559 response = self.client.post( 560 reverse("authentik_providers_oauth2:token"), 561 data={ 562 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 563 "code": code.code, 564 "redirect_uri": "http://local.invalid", 565 }, 566 HTTP_AUTHORIZATION=f"Basic {header}", 567 ) 568 self.assertEqual(response.status_code, 200) 569 570 access = AccessToken.objects.filter(user=user, provider=provider).first() 571 jwt_data = self.validate_jwt(access, provider) 572 573 # The scope should be the custom value from the property mapping, 574 # not the default "openid custom" 575 self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")
Test token view
def
setUp(self) -> None:
44 def setUp(self) -> None: 45 super().setUp() 46 self.factory = RequestFactory() 47 self.app = Application.objects.create(name=generate_id(), slug="test")
Hook method for setting up the test fixture before exercising it.
def
test_invalid_grant_type(self):
49 def test_invalid_grant_type(self): 50 """test request param""" 51 provider = OAuth2Provider.objects.create( 52 name=generate_id(), 53 authorization_flow=create_test_flow(), 54 grant_types=[], 55 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")], 56 signing_key=self.keypair, 57 ) 58 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 59 user = create_test_admin_user() 60 code = AuthorizationCode.objects.create( 61 code="foobar", provider=provider, user=user, auth_time=timezone.now() 62 ) 63 request = self.factory.post( 64 "/", 65 data={ 66 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 67 "code": code.code, 68 "redirect_uri": "http://TestServer", 69 }, 70 HTTP_AUTHORIZATION=f"Basic {header}", 71 ) 72 with self.assertRaises(TokenError) as cm: 73 TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 74 self.assertEqual(cm.exception.cause, "grant_type_not_configured")
test request param
def
test_request_auth_code(self):
76 def test_request_auth_code(self): 77 """test request param""" 78 provider = OAuth2Provider.objects.create( 79 name=generate_id(), 80 authorization_flow=create_test_flow(), 81 grant_types=[GrantType.AUTHORIZATION_CODE], 82 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")], 83 signing_key=self.keypair, 84 ) 85 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 86 user = create_test_admin_user() 87 code = AuthorizationCode.objects.create( 88 code="foobar", provider=provider, user=user, auth_time=timezone.now() 89 ) 90 request = self.factory.post( 91 "/", 92 data={ 93 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 94 "code": code.code, 95 "redirect_uri": "http://TestServer", 96 }, 97 HTTP_AUTHORIZATION=f"Basic {header}", 98 ) 99 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 100 self.assertEqual(params.provider, provider) 101 with self.assertRaises(TokenError): 102 TokenParams.parse(request, provider, provider.client_id, generate_key())
test request param
def
test_request_auth_code_invalid(self):
104 def test_request_auth_code_invalid(self): 105 """test request param""" 106 provider = OAuth2Provider.objects.create( 107 name=generate_id(), 108 authorization_flow=create_test_flow(), 109 grant_types=[GrantType.REFRESH_TOKEN], 110 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 111 signing_key=self.keypair, 112 ) 113 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 114 request = self.factory.post( 115 "/", 116 data={ 117 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 118 "code": "foo", 119 "redirect_uri": "http://testserver", 120 }, 121 HTTP_AUTHORIZATION=f"Basic {header}", 122 ) 123 with self.assertRaises(TokenError): 124 TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
test request param
def
test_request_refresh_token(self):
126 def test_request_refresh_token(self): 127 """test request param""" 128 provider = OAuth2Provider.objects.create( 129 name=generate_id(), 130 authorization_flow=create_test_flow(), 131 grant_types=[GrantType.REFRESH_TOKEN], 132 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 133 signing_key=self.keypair, 134 ) 135 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 136 user = create_test_admin_user() 137 token = RefreshToken.objects.create( 138 provider=provider, 139 user=user, 140 token=generate_id(), 141 auth_time=timezone.now(), 142 ) 143 request = self.factory.post( 144 "/", 145 data={ 146 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 147 "refresh_token": token.token, 148 "redirect_uri": "http://local.invalid", 149 }, 150 HTTP_AUTHORIZATION=f"Basic {header}", 151 ) 152 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 153 self.assertEqual(params.provider, provider)
test request param
def
test_extract_client_auth_basic_auth_percent_decodes(self):
155 def test_extract_client_auth_basic_auth_percent_decodes(self): 156 """test percent-decoding of client credentials in Basic auth""" 157 header = b64encode( 158 f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode() 159 ).decode() 160 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 161 self.assertEqual(extract_client_auth(request), ("client/id", "secret+/=="))
test percent-decoding of client credentials in Basic auth
def
test_extract_client_auth_basic_auth_preserves_raw_plus(self):
163 def test_extract_client_auth_basic_auth_preserves_raw_plus(self): 164 """test compatibility with clients that still send raw plus characters""" 165 header = b64encode(b"client:secret+plus").decode() 166 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 167 self.assertEqual(extract_client_auth(request), ("client", "secret+plus"))
test compatibility with clients that still send raw plus characters
def
test_auth_code_view(self):
169 def test_auth_code_view(self): 170 """test request param""" 171 provider = OAuth2Provider.objects.create( 172 name=generate_id(), 173 authorization_flow=create_test_flow(), 174 grant_types=[GrantType.AUTHORIZATION_CODE], 175 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 176 signing_key=self.keypair, 177 ) 178 # Needs to be assigned to an application for iss to be set 179 self.app.provider = provider 180 self.app.save() 181 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 182 user = create_test_admin_user() 183 code = AuthorizationCode.objects.create( 184 code="foobar", provider=provider, user=user, auth_time=timezone.now() 185 ) 186 response = self.client.post( 187 reverse("authentik_providers_oauth2:token"), 188 data={ 189 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 190 "code": code.code, 191 "redirect_uri": "http://local.invalid", 192 }, 193 HTTP_AUTHORIZATION=f"Basic {header}", 194 ) 195 access = AccessToken.objects.filter(user=user, provider=provider).first() 196 self.assertJSONEqual( 197 response.content.decode(), 198 { 199 "access_token": access.token, 200 "token_type": TOKEN_TYPE, 201 "expires_in": 3600, 202 "id_token": provider.encode( 203 access.id_token.to_dict(), 204 ), 205 "scope": "", 206 }, 207 ) 208 self.validate_jwt(access, provider)
test request param
def
test_auth_code_enc(self):
210 def test_auth_code_enc(self): 211 """test request param""" 212 provider = OAuth2Provider.objects.create( 213 name=generate_id(), 214 authorization_flow=create_test_flow(), 215 grant_types=[GrantType.AUTHORIZATION_CODE], 216 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 217 signing_key=self.keypair, 218 encryption_key=self.keypair, 219 ) 220 # Needs to be assigned to an application for iss to be set 221 self.app.provider = provider 222 self.app.save() 223 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 224 user = create_test_admin_user() 225 code = AuthorizationCode.objects.create( 226 code="foobar", provider=provider, user=user, auth_time=timezone.now() 227 ) 228 response = self.client.post( 229 reverse("authentik_providers_oauth2:token"), 230 data={ 231 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 232 "code": code.code, 233 "redirect_uri": "http://local.invalid", 234 }, 235 HTTP_AUTHORIZATION=f"Basic {header}", 236 ) 237 self.assertEqual(response.status_code, 200) 238 access = AccessToken.objects.filter(user=user, provider=provider).first() 239 self.validate_jwe(access, provider)
test request param
@apply_blueprint('system/providers-oauth2.yaml')
def
test_refresh_token_view(self):
241 @apply_blueprint("system/providers-oauth2.yaml") 242 def test_refresh_token_view(self): 243 """test request param""" 244 provider = OAuth2Provider.objects.create( 245 name=generate_id(), 246 authorization_flow=create_test_flow(), 247 grant_types=[GrantType.REFRESH_TOKEN], 248 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 249 signing_key=self.keypair, 250 ) 251 provider.property_mappings.set( 252 ScopeMapping.objects.filter( 253 managed__in=[ 254 "goauthentik.io/providers/oauth2/scope-openid", 255 "goauthentik.io/providers/oauth2/scope-email", 256 "goauthentik.io/providers/oauth2/scope-profile", 257 "goauthentik.io/providers/oauth2/scope-offline_access", 258 ] 259 ) 260 ) 261 # Needs to be assigned to an application for iss to be set 262 self.app.provider = provider 263 self.app.save() 264 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 265 user = create_test_admin_user() 266 token = RefreshToken.objects.create( 267 provider=provider, 268 user=user, 269 token=generate_id(), 270 _id_token=dumps({}), 271 auth_time=timezone.now(), 272 _scope="offline_access", 273 ) 274 response = self.client.post( 275 reverse("authentik_providers_oauth2:token"), 276 data={ 277 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 278 "refresh_token": token.token, 279 "redirect_uri": "http://local.invalid", 280 }, 281 HTTP_AUTHORIZATION=f"Basic {header}", 282 HTTP_ORIGIN="http://local.invalid", 283 ) 284 self.assertEqual(response["Access-Control-Allow-Credentials"], "true") 285 self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid") 286 access = AccessToken.objects.filter(user=user, provider=provider).first() 287 refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first() 288 self.assertJSONEqual( 289 response.content.decode(), 290 { 291 "access_token": access.token, 292 "refresh_token": refresh.token, 293 "token_type": TOKEN_TYPE, 294 "expires_in": 3600, 295 "id_token": provider.encode( 296 access.id_token.to_dict(), 297 ), 298 "scope": "offline_access", 299 }, 300 ) 301 self.validate_jwt(access, provider)
test request param
@apply_blueprint('system/providers-oauth2.yaml')
def
test_refresh_token_view_invalid_origin(self):
303 @apply_blueprint("system/providers-oauth2.yaml") 304 def test_refresh_token_view_invalid_origin(self): 305 """test request param""" 306 provider = OAuth2Provider.objects.create( 307 name=generate_id(), 308 authorization_flow=create_test_flow(), 309 grant_types=[GrantType.REFRESH_TOKEN], 310 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 311 signing_key=self.keypair, 312 ) 313 provider.property_mappings.set( 314 ScopeMapping.objects.filter( 315 managed__in=[ 316 "goauthentik.io/providers/oauth2/scope-openid", 317 "goauthentik.io/providers/oauth2/scope-email", 318 "goauthentik.io/providers/oauth2/scope-profile", 319 "goauthentik.io/providers/oauth2/scope-offline_access", 320 ] 321 ) 322 ) 323 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 324 user = create_test_admin_user() 325 token = RefreshToken.objects.create( 326 provider=provider, 327 user=user, 328 token=generate_id(), 329 _id_token=dumps({}), 330 auth_time=timezone.now(), 331 _scope="offline_access", 332 ) 333 response = self.client.post( 334 reverse("authentik_providers_oauth2:token"), 335 data={ 336 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 337 "refresh_token": token.token, 338 "redirect_uri": "http://local.invalid", 339 }, 340 HTTP_AUTHORIZATION=f"Basic {header}", 341 HTTP_ORIGIN="http://another.invalid", 342 ) 343 access = AccessToken.objects.filter(user=user, provider=provider).first() 344 refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first() 345 self.assertNotIn("Access-Control-Allow-Credentials", response) 346 self.assertNotIn("Access-Control-Allow-Origin", response) 347 self.assertJSONEqual( 348 response.content.decode(), 349 { 350 "access_token": access.token, 351 "refresh_token": refresh.token, 352 "token_type": TOKEN_TYPE, 353 "expires_in": 3600, 354 "id_token": provider.encode( 355 access.id_token.to_dict(), 356 ), 357 "scope": "offline_access", 358 }, 359 )
test request param
@apply_blueprint('system/providers-oauth2.yaml')
def
test_refresh_token_revoke(self):
361 @apply_blueprint("system/providers-oauth2.yaml") 362 def test_refresh_token_revoke(self): 363 """test request param""" 364 provider = OAuth2Provider.objects.create( 365 name=generate_id(), 366 authorization_flow=create_test_flow(), 367 grant_types=[GrantType.REFRESH_TOKEN], 368 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 369 signing_key=self.keypair, 370 ) 371 provider.property_mappings.set( 372 ScopeMapping.objects.filter( 373 managed__in=[ 374 "goauthentik.io/providers/oauth2/scope-openid", 375 "goauthentik.io/providers/oauth2/scope-email", 376 "goauthentik.io/providers/oauth2/scope-profile", 377 "goauthentik.io/providers/oauth2/scope-offline_access", 378 ] 379 ) 380 ) 381 # Needs to be assigned to an application for iss to be set 382 self.app.provider = provider 383 self.app.save() 384 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 385 user = create_test_admin_user() 386 token = RefreshToken.objects.create( 387 provider=provider, 388 user=user, 389 token=generate_id(), 390 _id_token=dumps({}), 391 auth_time=timezone.now(), 392 _scope="offline_access", 393 ) 394 # Create initial refresh token 395 response = self.client.post( 396 reverse("authentik_providers_oauth2:token"), 397 data={ 398 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 399 "refresh_token": token.token, 400 "redirect_uri": "http://testserver", 401 }, 402 HTTP_AUTHORIZATION=f"Basic {header}", 403 ) 404 new_token = RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first() 405 # Post again with initial token -> get new refresh token 406 # and revoke old one 407 response = self.client.post( 408 reverse("authentik_providers_oauth2:token"), 409 data={ 410 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 411 "refresh_token": new_token.token, 412 "redirect_uri": "http://local.invalid", 413 }, 414 HTTP_AUTHORIZATION=f"Basic {header}", 415 ) 416 self.assertEqual(response.status_code, 200) 417 # Post again with old token, is now revoked and should error 418 response = self.client.post( 419 reverse("authentik_providers_oauth2:token"), 420 data={ 421 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 422 "refresh_token": new_token.token, 423 "redirect_uri": "http://local.invalid", 424 }, 425 HTTP_AUTHORIZATION=f"Basic {header}", 426 ) 427 self.assertEqual(response.status_code, 400) 428 self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists())
test request param
@apply_blueprint('system/providers-oauth2.yaml')
def
test_refresh_token_view_threshold(self):
430 @apply_blueprint("system/providers-oauth2.yaml") 431 def test_refresh_token_view_threshold(self): 432 """refresh token threshold 433 434 threshold set to 1 hour, refresh token expires in 2 hours. 435 First request should not return a new refresh token, second request 436 has a fake time 1 hours in the future which should return a new access token""" 437 provider = OAuth2Provider.objects.create( 438 name=generate_id(), 439 authorization_flow=create_test_flow(), 440 grant_types=[GrantType.REFRESH_TOKEN], 441 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 442 signing_key=self.keypair, 443 refresh_token_threshold="hours=1", # nosec 444 ) 445 provider.property_mappings.set( 446 ScopeMapping.objects.filter( 447 managed__in=[ 448 "goauthentik.io/providers/oauth2/scope-openid", 449 "goauthentik.io/providers/oauth2/scope-email", 450 "goauthentik.io/providers/oauth2/scope-profile", 451 "goauthentik.io/providers/oauth2/scope-offline_access", 452 ] 453 ) 454 ) 455 # Needs to be assigned to an application for iss to be set 456 self.app.provider = provider 457 self.app.save() 458 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 459 user = create_test_admin_user() 460 token = RefreshToken.objects.create( 461 provider=provider, 462 user=user, 463 token=generate_id(), 464 _id_token=dumps({}), 465 auth_time=timezone.now(), 466 _scope="offline_access", 467 expires=now() + timedelta(hours=2), 468 ) 469 response = self.client.post( 470 reverse("authentik_providers_oauth2:token"), 471 data={ 472 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 473 "refresh_token": token.token, 474 "redirect_uri": "http://local.invalid", 475 }, 476 HTTP_AUTHORIZATION=f"Basic {header}", 477 HTTP_ORIGIN="http://local.invalid", 478 ) 479 access = AccessToken.objects.filter(user=user, provider=provider).first() 480 self.assertJSONEqual( 481 response.content.decode(), 482 { 483 "access_token": access.token, 484 "token_type": TOKEN_TYPE, 485 "expires_in": 3600, 486 "id_token": provider.encode( 487 access.id_token.to_dict(), 488 ), 489 "scope": "offline_access", 490 }, 491 ) 492 self.validate_jwt(access, provider) 493 494 with freeze_time(now() + timedelta(hours=1, minutes=10)): 495 response = self.client.post( 496 reverse("authentik_providers_oauth2:token"), 497 data={ 498 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 499 "refresh_token": token.token, 500 "redirect_uri": "http://local.invalid", 501 }, 502 HTTP_AUTHORIZATION=f"Basic {header}", 503 HTTP_ORIGIN="http://local.invalid", 504 ) 505 access = AccessToken.objects.filter(user=user, provider=provider).first() 506 refresh = RefreshToken.objects.filter(user=user, provider=provider).last() 507 self.assertJSONEqual( 508 response.content.decode(), 509 { 510 "access_token": access.token, 511 "token_type": TOKEN_TYPE, 512 "expires_in": 3600, 513 "id_token": provider.encode( 514 access.id_token.to_dict(), 515 ), 516 "scope": "offline_access", 517 "refresh_token": refresh.token, 518 }, 519 ) 520 self.validate_jwt(access, provider)
refresh token threshold
threshold set to 1 hour, refresh token expires in 2 hours. First request should not return a new refresh token, second request has a fake time 1 hours in the future which should return a new access token
@apply_blueprint('system/providers-oauth2.yaml')
def
test_scope_claim_override_via_property_mapping(self):
522 @apply_blueprint("system/providers-oauth2.yaml") 523 def test_scope_claim_override_via_property_mapping(self): 524 """Test that property mappings can override the scope claim in access tokens. 525 526 See: https://github.com/goauthentik/authentik/issues/19224 527 """ 528 # Create a custom scope mapping that returns a custom scope claim 529 custom_scope_mapping = ScopeMapping.objects.create( 530 name="custom-scope-override", 531 scope_name="custom", 532 expression='return {"scope": "custom-scope-value additional-scope"}', 533 ) 534 535 provider = OAuth2Provider.objects.create( 536 name=generate_id(), 537 authorization_flow=create_test_flow(), 538 grant_types=[GrantType.AUTHORIZATION_CODE], 539 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 540 signing_key=self.keypair, 541 include_claims_in_id_token=True, 542 ) 543 provider.property_mappings.add(custom_scope_mapping) 544 545 # Needs to be assigned to an application for iss to be set 546 self.app.provider = provider 547 self.app.save() 548 549 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 550 user = create_test_admin_user() 551 code = AuthorizationCode.objects.create( 552 code="foobar", 553 provider=provider, 554 user=user, 555 auth_time=timezone.now(), 556 _scope="openid custom", # Request the custom scope 557 ) 558 559 response = self.client.post( 560 reverse("authentik_providers_oauth2:token"), 561 data={ 562 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 563 "code": code.code, 564 "redirect_uri": "http://local.invalid", 565 }, 566 HTTP_AUTHORIZATION=f"Basic {header}", 567 ) 568 self.assertEqual(response.status_code, 200) 569 570 access = AccessToken.objects.filter(user=user, provider=provider).first() 571 jwt_data = self.validate_jwt(access, provider) 572 573 # The scope should be the custom value from the property mapping, 574 # not the default "openid custom" 575 self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")
Test that property mappings can override the scope claim in access tokens.