authentik.providers.oauth2.tests.test_token
Test token view
1"""Test token view""" 2 3from base64 import b64encode 4from json import dumps 5from urllib.parse import quote 6 7from django.test import RequestFactory 8from django.urls import reverse 9from django.utils import timezone 10 11from authentik.blueprints.tests import apply_blueprint 12from authentik.common.oauth.constants import ( 13 GRANT_TYPE_AUTHORIZATION_CODE, 14 GRANT_TYPE_REFRESH_TOKEN, 15 TOKEN_TYPE, 16) 17from authentik.core.models import Application 18from authentik.core.tests.utils import create_test_admin_user, create_test_flow 19from authentik.events.models import Event, EventAction 20from authentik.lib.generators import generate_id, generate_key 21from authentik.providers.oauth2.errors import TokenError 22from authentik.providers.oauth2.models import ( 23 AccessToken, 24 AuthorizationCode, 25 OAuth2Provider, 26 RedirectURI, 27 RedirectURIMatchingMode, 28 RefreshToken, 29 ScopeMapping, 30) 31from authentik.providers.oauth2.tests.utils import OAuthTestCase 32from authentik.providers.oauth2.utils import extract_client_auth 33from authentik.providers.oauth2.views.token import TokenParams 34 35 36class TestToken(OAuthTestCase): 37 """Test token view""" 38 39 def setUp(self) -> None: 40 super().setUp() 41 self.factory = RequestFactory() 42 self.app = Application.objects.create(name=generate_id(), slug="test") 43 44 def test_request_auth_code(self): 45 """test request param""" 46 provider = OAuth2Provider.objects.create( 47 name=generate_id(), 48 authorization_flow=create_test_flow(), 49 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")], 50 signing_key=self.keypair, 51 ) 52 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 53 user = create_test_admin_user() 54 code = AuthorizationCode.objects.create( 55 code="foobar", provider=provider, user=user, auth_time=timezone.now() 56 ) 57 request = self.factory.post( 58 "/", 59 data={ 60 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 61 "code": code.code, 62 "redirect_uri": "http://TestServer", 63 }, 64 HTTP_AUTHORIZATION=f"Basic {header}", 65 ) 66 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 67 self.assertEqual(params.provider, provider) 68 with self.assertRaises(TokenError): 69 TokenParams.parse(request, provider, provider.client_id, generate_key()) 70 71 def test_request_auth_code_invalid(self): 72 """test request param""" 73 provider = OAuth2Provider.objects.create( 74 name=generate_id(), 75 authorization_flow=create_test_flow(), 76 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 77 signing_key=self.keypair, 78 ) 79 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 80 request = self.factory.post( 81 "/", 82 data={ 83 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 84 "code": "foo", 85 "redirect_uri": "http://testserver", 86 }, 87 HTTP_AUTHORIZATION=f"Basic {header}", 88 ) 89 with self.assertRaises(TokenError): 90 TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 91 92 def test_request_refresh_token(self): 93 """test request param""" 94 provider = OAuth2Provider.objects.create( 95 name=generate_id(), 96 authorization_flow=create_test_flow(), 97 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 98 signing_key=self.keypair, 99 ) 100 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 101 user = create_test_admin_user() 102 token: RefreshToken = RefreshToken.objects.create( 103 provider=provider, 104 user=user, 105 token=generate_id(), 106 auth_time=timezone.now(), 107 ) 108 request = self.factory.post( 109 "/", 110 data={ 111 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 112 "refresh_token": token.token, 113 "redirect_uri": "http://local.invalid", 114 }, 115 HTTP_AUTHORIZATION=f"Basic {header}", 116 ) 117 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 118 self.assertEqual(params.provider, provider) 119 120 def test_extract_client_auth_basic_auth_percent_decodes(self): 121 """test percent-decoding of client credentials in Basic auth""" 122 header = b64encode( 123 f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode() 124 ).decode() 125 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 126 self.assertEqual(extract_client_auth(request), ("client/id", "secret+/==")) 127 128 def test_extract_client_auth_basic_auth_preserves_raw_plus(self): 129 """test compatibility with clients that still send raw plus characters""" 130 header = b64encode(b"client:secret+plus").decode() 131 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 132 self.assertEqual(extract_client_auth(request), ("client", "secret+plus")) 133 134 def test_auth_code_view(self): 135 """test request param""" 136 provider = OAuth2Provider.objects.create( 137 name=generate_id(), 138 authorization_flow=create_test_flow(), 139 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 140 signing_key=self.keypair, 141 ) 142 # Needs to be assigned to an application for iss to be set 143 self.app.provider = provider 144 self.app.save() 145 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 146 user = create_test_admin_user() 147 code = AuthorizationCode.objects.create( 148 code="foobar", provider=provider, user=user, auth_time=timezone.now() 149 ) 150 response = self.client.post( 151 reverse("authentik_providers_oauth2:token"), 152 data={ 153 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 154 "code": code.code, 155 "redirect_uri": "http://local.invalid", 156 }, 157 HTTP_AUTHORIZATION=f"Basic {header}", 158 ) 159 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 160 self.assertJSONEqual( 161 response.content.decode(), 162 { 163 "access_token": access.token, 164 "token_type": TOKEN_TYPE, 165 "expires_in": 3600, 166 "id_token": provider.encode( 167 access.id_token.to_dict(), 168 ), 169 "scope": "", 170 }, 171 ) 172 self.validate_jwt(access, provider) 173 174 def test_auth_code_enc(self): 175 """test request param""" 176 provider = OAuth2Provider.objects.create( 177 name=generate_id(), 178 authorization_flow=create_test_flow(), 179 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 180 signing_key=self.keypair, 181 encryption_key=self.keypair, 182 ) 183 # Needs to be assigned to an application for iss to be set 184 self.app.provider = provider 185 self.app.save() 186 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 187 user = create_test_admin_user() 188 code = AuthorizationCode.objects.create( 189 code="foobar", provider=provider, user=user, auth_time=timezone.now() 190 ) 191 response = self.client.post( 192 reverse("authentik_providers_oauth2:token"), 193 data={ 194 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 195 "code": code.code, 196 "redirect_uri": "http://local.invalid", 197 }, 198 HTTP_AUTHORIZATION=f"Basic {header}", 199 ) 200 self.assertEqual(response.status_code, 200) 201 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 202 self.validate_jwe(access, provider) 203 204 @apply_blueprint("system/providers-oauth2.yaml") 205 def test_refresh_token_view(self): 206 """test request param""" 207 provider = OAuth2Provider.objects.create( 208 name=generate_id(), 209 authorization_flow=create_test_flow(), 210 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 211 signing_key=self.keypair, 212 ) 213 provider.property_mappings.set( 214 ScopeMapping.objects.filter( 215 managed__in=[ 216 "goauthentik.io/providers/oauth2/scope-openid", 217 "goauthentik.io/providers/oauth2/scope-email", 218 "goauthentik.io/providers/oauth2/scope-profile", 219 "goauthentik.io/providers/oauth2/scope-offline_access", 220 ] 221 ) 222 ) 223 # Needs to be assigned to an application for iss to be set 224 self.app.provider = provider 225 self.app.save() 226 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 227 user = create_test_admin_user() 228 token: RefreshToken = RefreshToken.objects.create( 229 provider=provider, 230 user=user, 231 token=generate_id(), 232 _id_token=dumps({}), 233 auth_time=timezone.now(), 234 _scope="offline_access", 235 ) 236 response = self.client.post( 237 reverse("authentik_providers_oauth2:token"), 238 data={ 239 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 240 "refresh_token": token.token, 241 "redirect_uri": "http://local.invalid", 242 }, 243 HTTP_AUTHORIZATION=f"Basic {header}", 244 HTTP_ORIGIN="http://local.invalid", 245 ) 246 self.assertEqual(response["Access-Control-Allow-Credentials"], "true") 247 self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid") 248 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 249 refresh: RefreshToken = RefreshToken.objects.filter( 250 user=user, provider=provider, revoked=False 251 ).first() 252 self.assertJSONEqual( 253 response.content.decode(), 254 { 255 "access_token": access.token, 256 "refresh_token": refresh.token, 257 "token_type": TOKEN_TYPE, 258 "expires_in": 3600, 259 "id_token": provider.encode( 260 access.id_token.to_dict(), 261 ), 262 "scope": "offline_access", 263 }, 264 ) 265 self.validate_jwt(access, provider) 266 267 @apply_blueprint("system/providers-oauth2.yaml") 268 def test_refresh_token_view_invalid_origin(self): 269 """test request param""" 270 provider = OAuth2Provider.objects.create( 271 name=generate_id(), 272 authorization_flow=create_test_flow(), 273 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 274 signing_key=self.keypair, 275 ) 276 provider.property_mappings.set( 277 ScopeMapping.objects.filter( 278 managed__in=[ 279 "goauthentik.io/providers/oauth2/scope-openid", 280 "goauthentik.io/providers/oauth2/scope-email", 281 "goauthentik.io/providers/oauth2/scope-profile", 282 "goauthentik.io/providers/oauth2/scope-offline_access", 283 ] 284 ) 285 ) 286 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 287 user = create_test_admin_user() 288 token: RefreshToken = RefreshToken.objects.create( 289 provider=provider, 290 user=user, 291 token=generate_id(), 292 _id_token=dumps({}), 293 auth_time=timezone.now(), 294 _scope="offline_access", 295 ) 296 response = self.client.post( 297 reverse("authentik_providers_oauth2:token"), 298 data={ 299 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 300 "refresh_token": token.token, 301 "redirect_uri": "http://local.invalid", 302 }, 303 HTTP_AUTHORIZATION=f"Basic {header}", 304 HTTP_ORIGIN="http://another.invalid", 305 ) 306 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 307 refresh: RefreshToken = RefreshToken.objects.filter( 308 user=user, provider=provider, revoked=False 309 ).first() 310 self.assertNotIn("Access-Control-Allow-Credentials", response) 311 self.assertNotIn("Access-Control-Allow-Origin", response) 312 self.assertJSONEqual( 313 response.content.decode(), 314 { 315 "access_token": access.token, 316 "refresh_token": refresh.token, 317 "token_type": TOKEN_TYPE, 318 "expires_in": 3600, 319 "id_token": provider.encode( 320 access.id_token.to_dict(), 321 ), 322 "scope": "offline_access", 323 }, 324 ) 325 326 @apply_blueprint("system/providers-oauth2.yaml") 327 def test_refresh_token_revoke(self): 328 """test request param""" 329 provider = OAuth2Provider.objects.create( 330 name=generate_id(), 331 authorization_flow=create_test_flow(), 332 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 333 signing_key=self.keypair, 334 ) 335 provider.property_mappings.set( 336 ScopeMapping.objects.filter( 337 managed__in=[ 338 "goauthentik.io/providers/oauth2/scope-openid", 339 "goauthentik.io/providers/oauth2/scope-email", 340 "goauthentik.io/providers/oauth2/scope-profile", 341 "goauthentik.io/providers/oauth2/scope-offline_access", 342 ] 343 ) 344 ) 345 # Needs to be assigned to an application for iss to be set 346 self.app.provider = provider 347 self.app.save() 348 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 349 user = create_test_admin_user() 350 token: RefreshToken = RefreshToken.objects.create( 351 provider=provider, 352 user=user, 353 token=generate_id(), 354 _id_token=dumps({}), 355 auth_time=timezone.now(), 356 _scope="offline_access", 357 ) 358 # Create initial refresh token 359 response = self.client.post( 360 reverse("authentik_providers_oauth2:token"), 361 data={ 362 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 363 "refresh_token": token.token, 364 "redirect_uri": "http://testserver", 365 }, 366 HTTP_AUTHORIZATION=f"Basic {header}", 367 ) 368 new_token: RefreshToken = ( 369 RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first() 370 ) 371 # Post again with initial token -> get new refresh token 372 # and revoke old one 373 response = self.client.post( 374 reverse("authentik_providers_oauth2:token"), 375 data={ 376 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 377 "refresh_token": new_token.token, 378 "redirect_uri": "http://local.invalid", 379 }, 380 HTTP_AUTHORIZATION=f"Basic {header}", 381 ) 382 self.assertEqual(response.status_code, 200) 383 # Post again with old token, is now revoked and should error 384 response = self.client.post( 385 reverse("authentik_providers_oauth2:token"), 386 data={ 387 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 388 "refresh_token": new_token.token, 389 "redirect_uri": "http://local.invalid", 390 }, 391 HTTP_AUTHORIZATION=f"Basic {header}", 392 ) 393 self.assertEqual(response.status_code, 400) 394 self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists()) 395 396 @apply_blueprint("system/providers-oauth2.yaml") 397 def test_refresh_token_view_threshold(self): 398 """test request param""" 399 provider = OAuth2Provider.objects.create( 400 name=generate_id(), 401 authorization_flow=create_test_flow(), 402 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 403 signing_key=self.keypair, 404 refresh_token_threshold="hours=1", # nosec 405 ) 406 provider.property_mappings.set( 407 ScopeMapping.objects.filter( 408 managed__in=[ 409 "goauthentik.io/providers/oauth2/scope-openid", 410 "goauthentik.io/providers/oauth2/scope-email", 411 "goauthentik.io/providers/oauth2/scope-profile", 412 "goauthentik.io/providers/oauth2/scope-offline_access", 413 ] 414 ) 415 ) 416 # Needs to be assigned to an application for iss to be set 417 self.app.provider = provider 418 self.app.save() 419 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 420 user = create_test_admin_user() 421 token: RefreshToken = RefreshToken.objects.create( 422 provider=provider, 423 user=user, 424 token=generate_id(), 425 _id_token=dumps({}), 426 auth_time=timezone.now(), 427 _scope="offline_access", 428 ) 429 response = self.client.post( 430 reverse("authentik_providers_oauth2:token"), 431 data={ 432 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 433 "refresh_token": token.token, 434 "redirect_uri": "http://local.invalid", 435 }, 436 HTTP_AUTHORIZATION=f"Basic {header}", 437 HTTP_ORIGIN="http://local.invalid", 438 ) 439 self.assertEqual(response["Access-Control-Allow-Credentials"], "true") 440 self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid") 441 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 442 self.assertJSONEqual( 443 response.content.decode(), 444 { 445 "access_token": access.token, 446 "token_type": TOKEN_TYPE, 447 "expires_in": 3600, 448 "id_token": provider.encode( 449 access.id_token.to_dict(), 450 ), 451 "scope": "offline_access", 452 }, 453 ) 454 self.validate_jwt(access, provider) 455 456 @apply_blueprint("system/providers-oauth2.yaml") 457 def test_scope_claim_override_via_property_mapping(self): 458 """Test that property mappings can override the scope claim in access tokens. 459 460 See: https://github.com/goauthentik/authentik/issues/19224 461 """ 462 # Create a custom scope mapping that returns a custom scope claim 463 custom_scope_mapping = ScopeMapping.objects.create( 464 name="custom-scope-override", 465 scope_name="custom", 466 expression='return {"scope": "custom-scope-value additional-scope"}', 467 ) 468 469 provider = OAuth2Provider.objects.create( 470 name=generate_id(), 471 authorization_flow=create_test_flow(), 472 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 473 signing_key=self.keypair, 474 include_claims_in_id_token=True, 475 ) 476 provider.property_mappings.add(custom_scope_mapping) 477 478 # Needs to be assigned to an application for iss to be set 479 self.app.provider = provider 480 self.app.save() 481 482 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 483 user = create_test_admin_user() 484 code = AuthorizationCode.objects.create( 485 code="foobar", 486 provider=provider, 487 user=user, 488 auth_time=timezone.now(), 489 _scope="openid custom", # Request the custom scope 490 ) 491 492 response = self.client.post( 493 reverse("authentik_providers_oauth2:token"), 494 data={ 495 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 496 "code": code.code, 497 "redirect_uri": "http://local.invalid", 498 }, 499 HTTP_AUTHORIZATION=f"Basic {header}", 500 ) 501 self.assertEqual(response.status_code, 200) 502 503 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 504 jwt_data = self.validate_jwt(access, provider) 505 506 # The scope should be the custom value from the property mapping, 507 # not the default "openid custom" 508 self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")
37class TestToken(OAuthTestCase): 38 """Test token view""" 39 40 def setUp(self) -> None: 41 super().setUp() 42 self.factory = RequestFactory() 43 self.app = Application.objects.create(name=generate_id(), slug="test") 44 45 def test_request_auth_code(self): 46 """test request param""" 47 provider = OAuth2Provider.objects.create( 48 name=generate_id(), 49 authorization_flow=create_test_flow(), 50 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")], 51 signing_key=self.keypair, 52 ) 53 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 54 user = create_test_admin_user() 55 code = AuthorizationCode.objects.create( 56 code="foobar", provider=provider, user=user, auth_time=timezone.now() 57 ) 58 request = self.factory.post( 59 "/", 60 data={ 61 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 62 "code": code.code, 63 "redirect_uri": "http://TestServer", 64 }, 65 HTTP_AUTHORIZATION=f"Basic {header}", 66 ) 67 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 68 self.assertEqual(params.provider, provider) 69 with self.assertRaises(TokenError): 70 TokenParams.parse(request, provider, provider.client_id, generate_key()) 71 72 def test_request_auth_code_invalid(self): 73 """test request param""" 74 provider = OAuth2Provider.objects.create( 75 name=generate_id(), 76 authorization_flow=create_test_flow(), 77 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 78 signing_key=self.keypair, 79 ) 80 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 81 request = self.factory.post( 82 "/", 83 data={ 84 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 85 "code": "foo", 86 "redirect_uri": "http://testserver", 87 }, 88 HTTP_AUTHORIZATION=f"Basic {header}", 89 ) 90 with self.assertRaises(TokenError): 91 TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 92 93 def test_request_refresh_token(self): 94 """test request param""" 95 provider = OAuth2Provider.objects.create( 96 name=generate_id(), 97 authorization_flow=create_test_flow(), 98 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 99 signing_key=self.keypair, 100 ) 101 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 102 user = create_test_admin_user() 103 token: RefreshToken = RefreshToken.objects.create( 104 provider=provider, 105 user=user, 106 token=generate_id(), 107 auth_time=timezone.now(), 108 ) 109 request = self.factory.post( 110 "/", 111 data={ 112 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 113 "refresh_token": token.token, 114 "redirect_uri": "http://local.invalid", 115 }, 116 HTTP_AUTHORIZATION=f"Basic {header}", 117 ) 118 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 119 self.assertEqual(params.provider, provider) 120 121 def test_extract_client_auth_basic_auth_percent_decodes(self): 122 """test percent-decoding of client credentials in Basic auth""" 123 header = b64encode( 124 f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode() 125 ).decode() 126 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 127 self.assertEqual(extract_client_auth(request), ("client/id", "secret+/==")) 128 129 def test_extract_client_auth_basic_auth_preserves_raw_plus(self): 130 """test compatibility with clients that still send raw plus characters""" 131 header = b64encode(b"client:secret+plus").decode() 132 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 133 self.assertEqual(extract_client_auth(request), ("client", "secret+plus")) 134 135 def test_auth_code_view(self): 136 """test request param""" 137 provider = OAuth2Provider.objects.create( 138 name=generate_id(), 139 authorization_flow=create_test_flow(), 140 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 141 signing_key=self.keypair, 142 ) 143 # Needs to be assigned to an application for iss to be set 144 self.app.provider = provider 145 self.app.save() 146 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 147 user = create_test_admin_user() 148 code = AuthorizationCode.objects.create( 149 code="foobar", provider=provider, user=user, auth_time=timezone.now() 150 ) 151 response = self.client.post( 152 reverse("authentik_providers_oauth2:token"), 153 data={ 154 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 155 "code": code.code, 156 "redirect_uri": "http://local.invalid", 157 }, 158 HTTP_AUTHORIZATION=f"Basic {header}", 159 ) 160 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 161 self.assertJSONEqual( 162 response.content.decode(), 163 { 164 "access_token": access.token, 165 "token_type": TOKEN_TYPE, 166 "expires_in": 3600, 167 "id_token": provider.encode( 168 access.id_token.to_dict(), 169 ), 170 "scope": "", 171 }, 172 ) 173 self.validate_jwt(access, provider) 174 175 def test_auth_code_enc(self): 176 """test request param""" 177 provider = OAuth2Provider.objects.create( 178 name=generate_id(), 179 authorization_flow=create_test_flow(), 180 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 181 signing_key=self.keypair, 182 encryption_key=self.keypair, 183 ) 184 # Needs to be assigned to an application for iss to be set 185 self.app.provider = provider 186 self.app.save() 187 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 188 user = create_test_admin_user() 189 code = AuthorizationCode.objects.create( 190 code="foobar", provider=provider, user=user, auth_time=timezone.now() 191 ) 192 response = self.client.post( 193 reverse("authentik_providers_oauth2:token"), 194 data={ 195 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 196 "code": code.code, 197 "redirect_uri": "http://local.invalid", 198 }, 199 HTTP_AUTHORIZATION=f"Basic {header}", 200 ) 201 self.assertEqual(response.status_code, 200) 202 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 203 self.validate_jwe(access, provider) 204 205 @apply_blueprint("system/providers-oauth2.yaml") 206 def test_refresh_token_view(self): 207 """test request param""" 208 provider = OAuth2Provider.objects.create( 209 name=generate_id(), 210 authorization_flow=create_test_flow(), 211 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 212 signing_key=self.keypair, 213 ) 214 provider.property_mappings.set( 215 ScopeMapping.objects.filter( 216 managed__in=[ 217 "goauthentik.io/providers/oauth2/scope-openid", 218 "goauthentik.io/providers/oauth2/scope-email", 219 "goauthentik.io/providers/oauth2/scope-profile", 220 "goauthentik.io/providers/oauth2/scope-offline_access", 221 ] 222 ) 223 ) 224 # Needs to be assigned to an application for iss to be set 225 self.app.provider = provider 226 self.app.save() 227 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 228 user = create_test_admin_user() 229 token: RefreshToken = RefreshToken.objects.create( 230 provider=provider, 231 user=user, 232 token=generate_id(), 233 _id_token=dumps({}), 234 auth_time=timezone.now(), 235 _scope="offline_access", 236 ) 237 response = self.client.post( 238 reverse("authentik_providers_oauth2:token"), 239 data={ 240 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 241 "refresh_token": token.token, 242 "redirect_uri": "http://local.invalid", 243 }, 244 HTTP_AUTHORIZATION=f"Basic {header}", 245 HTTP_ORIGIN="http://local.invalid", 246 ) 247 self.assertEqual(response["Access-Control-Allow-Credentials"], "true") 248 self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid") 249 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 250 refresh: RefreshToken = RefreshToken.objects.filter( 251 user=user, provider=provider, revoked=False 252 ).first() 253 self.assertJSONEqual( 254 response.content.decode(), 255 { 256 "access_token": access.token, 257 "refresh_token": refresh.token, 258 "token_type": TOKEN_TYPE, 259 "expires_in": 3600, 260 "id_token": provider.encode( 261 access.id_token.to_dict(), 262 ), 263 "scope": "offline_access", 264 }, 265 ) 266 self.validate_jwt(access, provider) 267 268 @apply_blueprint("system/providers-oauth2.yaml") 269 def test_refresh_token_view_invalid_origin(self): 270 """test request param""" 271 provider = OAuth2Provider.objects.create( 272 name=generate_id(), 273 authorization_flow=create_test_flow(), 274 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 275 signing_key=self.keypair, 276 ) 277 provider.property_mappings.set( 278 ScopeMapping.objects.filter( 279 managed__in=[ 280 "goauthentik.io/providers/oauth2/scope-openid", 281 "goauthentik.io/providers/oauth2/scope-email", 282 "goauthentik.io/providers/oauth2/scope-profile", 283 "goauthentik.io/providers/oauth2/scope-offline_access", 284 ] 285 ) 286 ) 287 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 288 user = create_test_admin_user() 289 token: RefreshToken = RefreshToken.objects.create( 290 provider=provider, 291 user=user, 292 token=generate_id(), 293 _id_token=dumps({}), 294 auth_time=timezone.now(), 295 _scope="offline_access", 296 ) 297 response = self.client.post( 298 reverse("authentik_providers_oauth2:token"), 299 data={ 300 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 301 "refresh_token": token.token, 302 "redirect_uri": "http://local.invalid", 303 }, 304 HTTP_AUTHORIZATION=f"Basic {header}", 305 HTTP_ORIGIN="http://another.invalid", 306 ) 307 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 308 refresh: RefreshToken = RefreshToken.objects.filter( 309 user=user, provider=provider, revoked=False 310 ).first() 311 self.assertNotIn("Access-Control-Allow-Credentials", response) 312 self.assertNotIn("Access-Control-Allow-Origin", response) 313 self.assertJSONEqual( 314 response.content.decode(), 315 { 316 "access_token": access.token, 317 "refresh_token": refresh.token, 318 "token_type": TOKEN_TYPE, 319 "expires_in": 3600, 320 "id_token": provider.encode( 321 access.id_token.to_dict(), 322 ), 323 "scope": "offline_access", 324 }, 325 ) 326 327 @apply_blueprint("system/providers-oauth2.yaml") 328 def test_refresh_token_revoke(self): 329 """test request param""" 330 provider = OAuth2Provider.objects.create( 331 name=generate_id(), 332 authorization_flow=create_test_flow(), 333 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 334 signing_key=self.keypair, 335 ) 336 provider.property_mappings.set( 337 ScopeMapping.objects.filter( 338 managed__in=[ 339 "goauthentik.io/providers/oauth2/scope-openid", 340 "goauthentik.io/providers/oauth2/scope-email", 341 "goauthentik.io/providers/oauth2/scope-profile", 342 "goauthentik.io/providers/oauth2/scope-offline_access", 343 ] 344 ) 345 ) 346 # Needs to be assigned to an application for iss to be set 347 self.app.provider = provider 348 self.app.save() 349 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 350 user = create_test_admin_user() 351 token: RefreshToken = RefreshToken.objects.create( 352 provider=provider, 353 user=user, 354 token=generate_id(), 355 _id_token=dumps({}), 356 auth_time=timezone.now(), 357 _scope="offline_access", 358 ) 359 # Create initial refresh token 360 response = self.client.post( 361 reverse("authentik_providers_oauth2:token"), 362 data={ 363 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 364 "refresh_token": token.token, 365 "redirect_uri": "http://testserver", 366 }, 367 HTTP_AUTHORIZATION=f"Basic {header}", 368 ) 369 new_token: RefreshToken = ( 370 RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first() 371 ) 372 # Post again with initial token -> get new refresh token 373 # and revoke old one 374 response = self.client.post( 375 reverse("authentik_providers_oauth2:token"), 376 data={ 377 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 378 "refresh_token": new_token.token, 379 "redirect_uri": "http://local.invalid", 380 }, 381 HTTP_AUTHORIZATION=f"Basic {header}", 382 ) 383 self.assertEqual(response.status_code, 200) 384 # Post again with old token, is now revoked and should error 385 response = self.client.post( 386 reverse("authentik_providers_oauth2:token"), 387 data={ 388 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 389 "refresh_token": new_token.token, 390 "redirect_uri": "http://local.invalid", 391 }, 392 HTTP_AUTHORIZATION=f"Basic {header}", 393 ) 394 self.assertEqual(response.status_code, 400) 395 self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists()) 396 397 @apply_blueprint("system/providers-oauth2.yaml") 398 def test_refresh_token_view_threshold(self): 399 """test request param""" 400 provider = OAuth2Provider.objects.create( 401 name=generate_id(), 402 authorization_flow=create_test_flow(), 403 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 404 signing_key=self.keypair, 405 refresh_token_threshold="hours=1", # nosec 406 ) 407 provider.property_mappings.set( 408 ScopeMapping.objects.filter( 409 managed__in=[ 410 "goauthentik.io/providers/oauth2/scope-openid", 411 "goauthentik.io/providers/oauth2/scope-email", 412 "goauthentik.io/providers/oauth2/scope-profile", 413 "goauthentik.io/providers/oauth2/scope-offline_access", 414 ] 415 ) 416 ) 417 # Needs to be assigned to an application for iss to be set 418 self.app.provider = provider 419 self.app.save() 420 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 421 user = create_test_admin_user() 422 token: RefreshToken = RefreshToken.objects.create( 423 provider=provider, 424 user=user, 425 token=generate_id(), 426 _id_token=dumps({}), 427 auth_time=timezone.now(), 428 _scope="offline_access", 429 ) 430 response = self.client.post( 431 reverse("authentik_providers_oauth2:token"), 432 data={ 433 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 434 "refresh_token": token.token, 435 "redirect_uri": "http://local.invalid", 436 }, 437 HTTP_AUTHORIZATION=f"Basic {header}", 438 HTTP_ORIGIN="http://local.invalid", 439 ) 440 self.assertEqual(response["Access-Control-Allow-Credentials"], "true") 441 self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid") 442 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 443 self.assertJSONEqual( 444 response.content.decode(), 445 { 446 "access_token": access.token, 447 "token_type": TOKEN_TYPE, 448 "expires_in": 3600, 449 "id_token": provider.encode( 450 access.id_token.to_dict(), 451 ), 452 "scope": "offline_access", 453 }, 454 ) 455 self.validate_jwt(access, provider) 456 457 @apply_blueprint("system/providers-oauth2.yaml") 458 def test_scope_claim_override_via_property_mapping(self): 459 """Test that property mappings can override the scope claim in access tokens. 460 461 See: https://github.com/goauthentik/authentik/issues/19224 462 """ 463 # Create a custom scope mapping that returns a custom scope claim 464 custom_scope_mapping = ScopeMapping.objects.create( 465 name="custom-scope-override", 466 scope_name="custom", 467 expression='return {"scope": "custom-scope-value additional-scope"}', 468 ) 469 470 provider = OAuth2Provider.objects.create( 471 name=generate_id(), 472 authorization_flow=create_test_flow(), 473 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 474 signing_key=self.keypair, 475 include_claims_in_id_token=True, 476 ) 477 provider.property_mappings.add(custom_scope_mapping) 478 479 # Needs to be assigned to an application for iss to be set 480 self.app.provider = provider 481 self.app.save() 482 483 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 484 user = create_test_admin_user() 485 code = AuthorizationCode.objects.create( 486 code="foobar", 487 provider=provider, 488 user=user, 489 auth_time=timezone.now(), 490 _scope="openid custom", # Request the custom scope 491 ) 492 493 response = self.client.post( 494 reverse("authentik_providers_oauth2:token"), 495 data={ 496 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 497 "code": code.code, 498 "redirect_uri": "http://local.invalid", 499 }, 500 HTTP_AUTHORIZATION=f"Basic {header}", 501 ) 502 self.assertEqual(response.status_code, 200) 503 504 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 505 jwt_data = self.validate_jwt(access, provider) 506 507 # The scope should be the custom value from the property mapping, 508 # not the default "openid custom" 509 self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")
Test token view
def
setUp(self) -> None:
40 def setUp(self) -> None: 41 super().setUp() 42 self.factory = RequestFactory() 43 self.app = Application.objects.create(name=generate_id(), slug="test")
Hook method for setting up the test fixture before exercising it.
def
test_request_auth_code(self):
45 def test_request_auth_code(self): 46 """test request param""" 47 provider = OAuth2Provider.objects.create( 48 name=generate_id(), 49 authorization_flow=create_test_flow(), 50 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")], 51 signing_key=self.keypair, 52 ) 53 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 54 user = create_test_admin_user() 55 code = AuthorizationCode.objects.create( 56 code="foobar", provider=provider, user=user, auth_time=timezone.now() 57 ) 58 request = self.factory.post( 59 "/", 60 data={ 61 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 62 "code": code.code, 63 "redirect_uri": "http://TestServer", 64 }, 65 HTTP_AUTHORIZATION=f"Basic {header}", 66 ) 67 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 68 self.assertEqual(params.provider, provider) 69 with self.assertRaises(TokenError): 70 TokenParams.parse(request, provider, provider.client_id, generate_key())
test request param
def
test_request_auth_code_invalid(self):
72 def test_request_auth_code_invalid(self): 73 """test request param""" 74 provider = OAuth2Provider.objects.create( 75 name=generate_id(), 76 authorization_flow=create_test_flow(), 77 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 78 signing_key=self.keypair, 79 ) 80 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 81 request = self.factory.post( 82 "/", 83 data={ 84 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 85 "code": "foo", 86 "redirect_uri": "http://testserver", 87 }, 88 HTTP_AUTHORIZATION=f"Basic {header}", 89 ) 90 with self.assertRaises(TokenError): 91 TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
test request param
def
test_request_refresh_token(self):
93 def test_request_refresh_token(self): 94 """test request param""" 95 provider = OAuth2Provider.objects.create( 96 name=generate_id(), 97 authorization_flow=create_test_flow(), 98 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 99 signing_key=self.keypair, 100 ) 101 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 102 user = create_test_admin_user() 103 token: RefreshToken = RefreshToken.objects.create( 104 provider=provider, 105 user=user, 106 token=generate_id(), 107 auth_time=timezone.now(), 108 ) 109 request = self.factory.post( 110 "/", 111 data={ 112 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 113 "refresh_token": token.token, 114 "redirect_uri": "http://local.invalid", 115 }, 116 HTTP_AUTHORIZATION=f"Basic {header}", 117 ) 118 params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret) 119 self.assertEqual(params.provider, provider)
test request param
def
test_extract_client_auth_basic_auth_percent_decodes(self):
121 def test_extract_client_auth_basic_auth_percent_decodes(self): 122 """test percent-decoding of client credentials in Basic auth""" 123 header = b64encode( 124 f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode() 125 ).decode() 126 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 127 self.assertEqual(extract_client_auth(request), ("client/id", "secret+/=="))
test percent-decoding of client credentials in Basic auth
def
test_extract_client_auth_basic_auth_preserves_raw_plus(self):
129 def test_extract_client_auth_basic_auth_preserves_raw_plus(self): 130 """test compatibility with clients that still send raw plus characters""" 131 header = b64encode(b"client:secret+plus").decode() 132 request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}") 133 self.assertEqual(extract_client_auth(request), ("client", "secret+plus"))
test compatibility with clients that still send raw plus characters
def
test_auth_code_view(self):
135 def test_auth_code_view(self): 136 """test request param""" 137 provider = OAuth2Provider.objects.create( 138 name=generate_id(), 139 authorization_flow=create_test_flow(), 140 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 141 signing_key=self.keypair, 142 ) 143 # Needs to be assigned to an application for iss to be set 144 self.app.provider = provider 145 self.app.save() 146 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 147 user = create_test_admin_user() 148 code = AuthorizationCode.objects.create( 149 code="foobar", provider=provider, user=user, auth_time=timezone.now() 150 ) 151 response = self.client.post( 152 reverse("authentik_providers_oauth2:token"), 153 data={ 154 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 155 "code": code.code, 156 "redirect_uri": "http://local.invalid", 157 }, 158 HTTP_AUTHORIZATION=f"Basic {header}", 159 ) 160 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 161 self.assertJSONEqual( 162 response.content.decode(), 163 { 164 "access_token": access.token, 165 "token_type": TOKEN_TYPE, 166 "expires_in": 3600, 167 "id_token": provider.encode( 168 access.id_token.to_dict(), 169 ), 170 "scope": "", 171 }, 172 ) 173 self.validate_jwt(access, provider)
test request param
def
test_auth_code_enc(self):
175 def test_auth_code_enc(self): 176 """test request param""" 177 provider = OAuth2Provider.objects.create( 178 name=generate_id(), 179 authorization_flow=create_test_flow(), 180 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 181 signing_key=self.keypair, 182 encryption_key=self.keypair, 183 ) 184 # Needs to be assigned to an application for iss to be set 185 self.app.provider = provider 186 self.app.save() 187 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 188 user = create_test_admin_user() 189 code = AuthorizationCode.objects.create( 190 code="foobar", provider=provider, user=user, auth_time=timezone.now() 191 ) 192 response = self.client.post( 193 reverse("authentik_providers_oauth2:token"), 194 data={ 195 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 196 "code": code.code, 197 "redirect_uri": "http://local.invalid", 198 }, 199 HTTP_AUTHORIZATION=f"Basic {header}", 200 ) 201 self.assertEqual(response.status_code, 200) 202 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 203 self.validate_jwe(access, provider)
test request param
@apply_blueprint('system/providers-oauth2.yaml')
def
test_refresh_token_view(self):
205 @apply_blueprint("system/providers-oauth2.yaml") 206 def test_refresh_token_view(self): 207 """test request param""" 208 provider = OAuth2Provider.objects.create( 209 name=generate_id(), 210 authorization_flow=create_test_flow(), 211 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 212 signing_key=self.keypair, 213 ) 214 provider.property_mappings.set( 215 ScopeMapping.objects.filter( 216 managed__in=[ 217 "goauthentik.io/providers/oauth2/scope-openid", 218 "goauthentik.io/providers/oauth2/scope-email", 219 "goauthentik.io/providers/oauth2/scope-profile", 220 "goauthentik.io/providers/oauth2/scope-offline_access", 221 ] 222 ) 223 ) 224 # Needs to be assigned to an application for iss to be set 225 self.app.provider = provider 226 self.app.save() 227 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 228 user = create_test_admin_user() 229 token: RefreshToken = RefreshToken.objects.create( 230 provider=provider, 231 user=user, 232 token=generate_id(), 233 _id_token=dumps({}), 234 auth_time=timezone.now(), 235 _scope="offline_access", 236 ) 237 response = self.client.post( 238 reverse("authentik_providers_oauth2:token"), 239 data={ 240 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 241 "refresh_token": token.token, 242 "redirect_uri": "http://local.invalid", 243 }, 244 HTTP_AUTHORIZATION=f"Basic {header}", 245 HTTP_ORIGIN="http://local.invalid", 246 ) 247 self.assertEqual(response["Access-Control-Allow-Credentials"], "true") 248 self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid") 249 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 250 refresh: RefreshToken = RefreshToken.objects.filter( 251 user=user, provider=provider, revoked=False 252 ).first() 253 self.assertJSONEqual( 254 response.content.decode(), 255 { 256 "access_token": access.token, 257 "refresh_token": refresh.token, 258 "token_type": TOKEN_TYPE, 259 "expires_in": 3600, 260 "id_token": provider.encode( 261 access.id_token.to_dict(), 262 ), 263 "scope": "offline_access", 264 }, 265 ) 266 self.validate_jwt(access, provider)
test request param
@apply_blueprint('system/providers-oauth2.yaml')
def
test_refresh_token_view_invalid_origin(self):
268 @apply_blueprint("system/providers-oauth2.yaml") 269 def test_refresh_token_view_invalid_origin(self): 270 """test request param""" 271 provider = OAuth2Provider.objects.create( 272 name=generate_id(), 273 authorization_flow=create_test_flow(), 274 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 275 signing_key=self.keypair, 276 ) 277 provider.property_mappings.set( 278 ScopeMapping.objects.filter( 279 managed__in=[ 280 "goauthentik.io/providers/oauth2/scope-openid", 281 "goauthentik.io/providers/oauth2/scope-email", 282 "goauthentik.io/providers/oauth2/scope-profile", 283 "goauthentik.io/providers/oauth2/scope-offline_access", 284 ] 285 ) 286 ) 287 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 288 user = create_test_admin_user() 289 token: RefreshToken = RefreshToken.objects.create( 290 provider=provider, 291 user=user, 292 token=generate_id(), 293 _id_token=dumps({}), 294 auth_time=timezone.now(), 295 _scope="offline_access", 296 ) 297 response = self.client.post( 298 reverse("authentik_providers_oauth2:token"), 299 data={ 300 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 301 "refresh_token": token.token, 302 "redirect_uri": "http://local.invalid", 303 }, 304 HTTP_AUTHORIZATION=f"Basic {header}", 305 HTTP_ORIGIN="http://another.invalid", 306 ) 307 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 308 refresh: RefreshToken = RefreshToken.objects.filter( 309 user=user, provider=provider, revoked=False 310 ).first() 311 self.assertNotIn("Access-Control-Allow-Credentials", response) 312 self.assertNotIn("Access-Control-Allow-Origin", response) 313 self.assertJSONEqual( 314 response.content.decode(), 315 { 316 "access_token": access.token, 317 "refresh_token": refresh.token, 318 "token_type": TOKEN_TYPE, 319 "expires_in": 3600, 320 "id_token": provider.encode( 321 access.id_token.to_dict(), 322 ), 323 "scope": "offline_access", 324 }, 325 )
test request param
@apply_blueprint('system/providers-oauth2.yaml')
def
test_refresh_token_revoke(self):
327 @apply_blueprint("system/providers-oauth2.yaml") 328 def test_refresh_token_revoke(self): 329 """test request param""" 330 provider = OAuth2Provider.objects.create( 331 name=generate_id(), 332 authorization_flow=create_test_flow(), 333 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 334 signing_key=self.keypair, 335 ) 336 provider.property_mappings.set( 337 ScopeMapping.objects.filter( 338 managed__in=[ 339 "goauthentik.io/providers/oauth2/scope-openid", 340 "goauthentik.io/providers/oauth2/scope-email", 341 "goauthentik.io/providers/oauth2/scope-profile", 342 "goauthentik.io/providers/oauth2/scope-offline_access", 343 ] 344 ) 345 ) 346 # Needs to be assigned to an application for iss to be set 347 self.app.provider = provider 348 self.app.save() 349 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 350 user = create_test_admin_user() 351 token: RefreshToken = RefreshToken.objects.create( 352 provider=provider, 353 user=user, 354 token=generate_id(), 355 _id_token=dumps({}), 356 auth_time=timezone.now(), 357 _scope="offline_access", 358 ) 359 # Create initial refresh token 360 response = self.client.post( 361 reverse("authentik_providers_oauth2:token"), 362 data={ 363 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 364 "refresh_token": token.token, 365 "redirect_uri": "http://testserver", 366 }, 367 HTTP_AUTHORIZATION=f"Basic {header}", 368 ) 369 new_token: RefreshToken = ( 370 RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first() 371 ) 372 # Post again with initial token -> get new refresh token 373 # and revoke old one 374 response = self.client.post( 375 reverse("authentik_providers_oauth2:token"), 376 data={ 377 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 378 "refresh_token": new_token.token, 379 "redirect_uri": "http://local.invalid", 380 }, 381 HTTP_AUTHORIZATION=f"Basic {header}", 382 ) 383 self.assertEqual(response.status_code, 200) 384 # Post again with old token, is now revoked and should error 385 response = self.client.post( 386 reverse("authentik_providers_oauth2:token"), 387 data={ 388 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 389 "refresh_token": new_token.token, 390 "redirect_uri": "http://local.invalid", 391 }, 392 HTTP_AUTHORIZATION=f"Basic {header}", 393 ) 394 self.assertEqual(response.status_code, 400) 395 self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists())
test request param
@apply_blueprint('system/providers-oauth2.yaml')
def
test_refresh_token_view_threshold(self):
397 @apply_blueprint("system/providers-oauth2.yaml") 398 def test_refresh_token_view_threshold(self): 399 """test request param""" 400 provider = OAuth2Provider.objects.create( 401 name=generate_id(), 402 authorization_flow=create_test_flow(), 403 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 404 signing_key=self.keypair, 405 refresh_token_threshold="hours=1", # nosec 406 ) 407 provider.property_mappings.set( 408 ScopeMapping.objects.filter( 409 managed__in=[ 410 "goauthentik.io/providers/oauth2/scope-openid", 411 "goauthentik.io/providers/oauth2/scope-email", 412 "goauthentik.io/providers/oauth2/scope-profile", 413 "goauthentik.io/providers/oauth2/scope-offline_access", 414 ] 415 ) 416 ) 417 # Needs to be assigned to an application for iss to be set 418 self.app.provider = provider 419 self.app.save() 420 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 421 user = create_test_admin_user() 422 token: RefreshToken = RefreshToken.objects.create( 423 provider=provider, 424 user=user, 425 token=generate_id(), 426 _id_token=dumps({}), 427 auth_time=timezone.now(), 428 _scope="offline_access", 429 ) 430 response = self.client.post( 431 reverse("authentik_providers_oauth2:token"), 432 data={ 433 "grant_type": GRANT_TYPE_REFRESH_TOKEN, 434 "refresh_token": token.token, 435 "redirect_uri": "http://local.invalid", 436 }, 437 HTTP_AUTHORIZATION=f"Basic {header}", 438 HTTP_ORIGIN="http://local.invalid", 439 ) 440 self.assertEqual(response["Access-Control-Allow-Credentials"], "true") 441 self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid") 442 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 443 self.assertJSONEqual( 444 response.content.decode(), 445 { 446 "access_token": access.token, 447 "token_type": TOKEN_TYPE, 448 "expires_in": 3600, 449 "id_token": provider.encode( 450 access.id_token.to_dict(), 451 ), 452 "scope": "offline_access", 453 }, 454 ) 455 self.validate_jwt(access, provider)
test request param
@apply_blueprint('system/providers-oauth2.yaml')
def
test_scope_claim_override_via_property_mapping(self):
457 @apply_blueprint("system/providers-oauth2.yaml") 458 def test_scope_claim_override_via_property_mapping(self): 459 """Test that property mappings can override the scope claim in access tokens. 460 461 See: https://github.com/goauthentik/authentik/issues/19224 462 """ 463 # Create a custom scope mapping that returns a custom scope claim 464 custom_scope_mapping = ScopeMapping.objects.create( 465 name="custom-scope-override", 466 scope_name="custom", 467 expression='return {"scope": "custom-scope-value additional-scope"}', 468 ) 469 470 provider = OAuth2Provider.objects.create( 471 name=generate_id(), 472 authorization_flow=create_test_flow(), 473 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")], 474 signing_key=self.keypair, 475 include_claims_in_id_token=True, 476 ) 477 provider.property_mappings.add(custom_scope_mapping) 478 479 # Needs to be assigned to an application for iss to be set 480 self.app.provider = provider 481 self.app.save() 482 483 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 484 user = create_test_admin_user() 485 code = AuthorizationCode.objects.create( 486 code="foobar", 487 provider=provider, 488 user=user, 489 auth_time=timezone.now(), 490 _scope="openid custom", # Request the custom scope 491 ) 492 493 response = self.client.post( 494 reverse("authentik_providers_oauth2:token"), 495 data={ 496 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 497 "code": code.code, 498 "redirect_uri": "http://local.invalid", 499 }, 500 HTTP_AUTHORIZATION=f"Basic {header}", 501 ) 502 self.assertEqual(response.status_code, 200) 503 504 access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first() 505 jwt_data = self.validate_jwt(access, provider) 506 507 # The scope should be the custom value from the property mapping, 508 # not the default "openid custom" 509 self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")
Test that property mappings can override the scope claim in access tokens.