authentik.providers.oauth2.tests.test_token_cc_jwt_source
Test token view
1"""Test token view""" 2 3from datetime import datetime, timedelta 4from json import loads 5 6from django.test import RequestFactory 7from django.urls import reverse 8from jwt import decode 9 10from authentik.blueprints.tests import apply_blueprint 11from authentik.common.oauth.constants import ( 12 GRANT_TYPE_CLIENT_CREDENTIALS, 13 SCOPE_OPENID, 14 SCOPE_OPENID_EMAIL, 15 SCOPE_OPENID_PROFILE, 16 TOKEN_TYPE, 17) 18from authentik.core.models import USERNAME_MAX_LENGTH, Application, Group, User 19from authentik.core.tests.utils import create_test_cert, create_test_flow 20from authentik.lib.generators import generate_id 21from authentik.policies.models import PolicyBinding 22from authentik.providers.oauth2.models import ( 23 GrantType, 24 OAuth2Provider, 25 RedirectURI, 26 RedirectURIMatchingMode, 27 ScopeMapping, 28) 29from authentik.providers.oauth2.tests.utils import OAuthTestCase 30from authentik.providers.oauth2.views.jwks import JWKSView 31from authentik.sources.oauth.models import OAuthSource, OAuthSourcePropertyMapping 32 33 34class TestTokenClientCredentialsJWTSource(OAuthTestCase): 35 """Test token (client_credentials, with JWT) view""" 36 37 @apply_blueprint("system/providers-oauth2.yaml") 38 def setUp(self) -> None: 39 super().setUp() 40 self.factory = RequestFactory() 41 self.other_cert = create_test_cert() 42 # Provider used as a helper to sign JWTs with the same key as the OAuth source has 43 self.helper_provider = OAuth2Provider.objects.create( 44 name=generate_id(), 45 authorization_flow=create_test_flow(), 46 signing_key=self.other_cert, 47 ) 48 self.cert = create_test_cert() 49 50 jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig") 51 self.source: OAuthSource = OAuthSource.objects.create( 52 name=generate_id(), 53 slug=generate_id(), 54 provider_type="openidconnect", 55 consumer_key=generate_id(), 56 consumer_secret=generate_id(), 57 authorization_url="http://foo", 58 access_token_url=f"http://{generate_id()}", 59 profile_url="http://foo", 60 oidc_well_known_url="", 61 oidc_jwks_url="", 62 oidc_jwks={ 63 "keys": [jwk], 64 }, 65 ) 66 67 self.provider: OAuth2Provider = OAuth2Provider.objects.create( 68 name="test", 69 authorization_flow=create_test_flow(), 70 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 71 signing_key=self.cert, 72 grant_types=[GrantType.CLIENT_CREDENTIALS], 73 ) 74 self.provider.jwt_federation_sources.add(self.source) 75 self.provider.property_mappings.set(ScopeMapping.objects.all()) 76 self.app = Application.objects.create(name="test", slug="test", provider=self.provider) 77 78 def test_invalid_type(self): 79 """test invalid type""" 80 response = self.client.post( 81 reverse("authentik_providers_oauth2:token"), 82 { 83 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 84 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 85 "client_id": self.provider.client_id, 86 "client_assertion_type": "foo", 87 "client_assertion": "foo.bar", 88 }, 89 ) 90 self.assertEqual(response.status_code, 400) 91 body = loads(response.content.decode()) 92 self.assertEqual(body["error"], "invalid_grant") 93 94 def test_invalid_jwt(self): 95 """test invalid JWT""" 96 response = self.client.post( 97 reverse("authentik_providers_oauth2:token"), 98 { 99 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 100 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 101 "client_id": self.provider.client_id, 102 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 103 "client_assertion": "foo.bar", 104 }, 105 ) 106 self.assertEqual(response.status_code, 400) 107 body = loads(response.content.decode()) 108 self.assertEqual(body["error"], "invalid_grant") 109 110 def test_invalid_signature(self): 111 """test invalid JWT""" 112 token = self.helper_provider.encode( 113 { 114 "sub": "foo", 115 "exp": datetime.now() + timedelta(hours=2), 116 } 117 ) 118 response = self.client.post( 119 reverse("authentik_providers_oauth2:token"), 120 { 121 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 122 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 123 "client_id": self.provider.client_id, 124 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 125 "client_assertion": token + "foo", 126 }, 127 ) 128 self.assertEqual(response.status_code, 400) 129 body = loads(response.content.decode()) 130 self.assertEqual(body["error"], "invalid_grant") 131 132 def test_invalid_expired(self): 133 """test invalid JWT""" 134 token = self.helper_provider.encode( 135 { 136 "sub": "foo", 137 "exp": datetime.now() - timedelta(hours=2), 138 } 139 ) 140 response = self.client.post( 141 reverse("authentik_providers_oauth2:token"), 142 { 143 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 144 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 145 "client_id": self.provider.client_id, 146 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 147 "client_assertion": token, 148 }, 149 ) 150 self.assertEqual(response.status_code, 400) 151 body = loads(response.content.decode()) 152 self.assertEqual(body["error"], "invalid_grant") 153 154 def test_invalid_no_app(self): 155 """test invalid JWT""" 156 self.app.provider = None 157 self.app.save() 158 token = self.helper_provider.encode( 159 { 160 "sub": "foo", 161 "exp": datetime.now() + timedelta(hours=2), 162 } 163 ) 164 response = self.client.post( 165 reverse("authentik_providers_oauth2:token"), 166 { 167 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 168 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 169 "client_id": self.provider.client_id, 170 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 171 "client_assertion": token, 172 }, 173 ) 174 self.assertEqual(response.status_code, 400) 175 body = loads(response.content.decode()) 176 self.assertEqual(body["error"], "invalid_grant") 177 178 def test_invalid_access_denied(self): 179 """test invalid JWT""" 180 group = Group.objects.create(name="foo") 181 PolicyBinding.objects.create( 182 group=group, 183 target=self.app, 184 order=0, 185 ) 186 token = self.helper_provider.encode( 187 { 188 "sub": "foo", 189 "exp": datetime.now() + timedelta(hours=2), 190 } 191 ) 192 response = self.client.post( 193 reverse("authentik_providers_oauth2:token"), 194 { 195 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 196 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 197 "client_id": self.provider.client_id, 198 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 199 "client_assertion": token, 200 }, 201 ) 202 self.assertEqual(response.status_code, 400) 203 body = loads(response.content.decode()) 204 self.assertEqual(body["error"], "invalid_grant") 205 206 def test_successful(self): 207 """test successful""" 208 token = self.helper_provider.encode( 209 { 210 "sub": "foo", 211 "exp": datetime.now() + timedelta(hours=2), 212 } 213 ) 214 response = self.client.post( 215 reverse("authentik_providers_oauth2:token"), 216 { 217 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 218 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 219 "client_id": self.provider.client_id, 220 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 221 "client_assertion": token, 222 }, 223 ) 224 self.assertEqual(response.status_code, 200) 225 226 user = User.objects.filter(username=f"{self.provider.name}-foo").first() 227 self.assertIsNotNone(user) 228 229 body = loads(response.content.decode()) 230 self.assertEqual(body["token_type"], TOKEN_TYPE) 231 _, alg = self.provider.jwt_key 232 jwt = decode( 233 body["access_token"], 234 key=self.provider.signing_key.public_key, 235 algorithms=[alg], 236 audience=self.provider.client_id, 237 ) 238 self.assertEqual( 239 jwt["given_name"], "Autogenerated user from application test (client credentials JWT)" 240 ) 241 self.assertEqual(jwt["preferred_username"], "test-foo") 242 243 def test_successful_mapping(self): 244 """test successful""" 245 test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH] 246 mapping = OAuthSourcePropertyMapping.objects.create( 247 name="test-mapping", 248 expression="""return { 249 "email": oauth_userinfo.get("email"), 250 "name": oauth_userinfo.get("name"), 251 "username": oauth_userinfo.get("username"), 252 }""", 253 ) 254 self.source.user_property_mappings.add(mapping) 255 256 token = self.helper_provider.encode( 257 { 258 "sub": "foo", 259 "email": "test-user@example.com", 260 "name": "Mapped Test User", 261 "username": "mapped-foo" + ("a" * 150), 262 "exp": datetime.now() + timedelta(hours=2), 263 } 264 ) 265 response = self.client.post( 266 reverse("authentik_providers_oauth2:token"), 267 { 268 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 269 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 270 "client_id": self.provider.client_id, 271 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 272 "client_assertion": token, 273 }, 274 ) 275 self.assertEqual(response.status_code, 200) 276 277 user = User.objects.filter(username=test_username).first() 278 self.assertIsNotNone(user) 279 280 body = loads(response.content.decode()) 281 self.assertEqual(body["token_type"], TOKEN_TYPE) 282 key_obj, alg = self.provider.jwt_key 283 jwt = decode( 284 body["access_token"], 285 key=key_obj.public_key(), 286 algorithms=[alg], 287 audience=self.provider.client_id, 288 ) 289 290 self.assertEqual(jwt["email"], "test-user@example.com") 291 self.assertEqual(jwt["given_name"], "Mapped Test User") 292 self.assertEqual(jwt["preferred_username"], test_username)
35class TestTokenClientCredentialsJWTSource(OAuthTestCase): 36 """Test token (client_credentials, with JWT) view""" 37 38 @apply_blueprint("system/providers-oauth2.yaml") 39 def setUp(self) -> None: 40 super().setUp() 41 self.factory = RequestFactory() 42 self.other_cert = create_test_cert() 43 # Provider used as a helper to sign JWTs with the same key as the OAuth source has 44 self.helper_provider = OAuth2Provider.objects.create( 45 name=generate_id(), 46 authorization_flow=create_test_flow(), 47 signing_key=self.other_cert, 48 ) 49 self.cert = create_test_cert() 50 51 jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig") 52 self.source: OAuthSource = OAuthSource.objects.create( 53 name=generate_id(), 54 slug=generate_id(), 55 provider_type="openidconnect", 56 consumer_key=generate_id(), 57 consumer_secret=generate_id(), 58 authorization_url="http://foo", 59 access_token_url=f"http://{generate_id()}", 60 profile_url="http://foo", 61 oidc_well_known_url="", 62 oidc_jwks_url="", 63 oidc_jwks={ 64 "keys": [jwk], 65 }, 66 ) 67 68 self.provider: OAuth2Provider = OAuth2Provider.objects.create( 69 name="test", 70 authorization_flow=create_test_flow(), 71 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 72 signing_key=self.cert, 73 grant_types=[GrantType.CLIENT_CREDENTIALS], 74 ) 75 self.provider.jwt_federation_sources.add(self.source) 76 self.provider.property_mappings.set(ScopeMapping.objects.all()) 77 self.app = Application.objects.create(name="test", slug="test", provider=self.provider) 78 79 def test_invalid_type(self): 80 """test invalid type""" 81 response = self.client.post( 82 reverse("authentik_providers_oauth2:token"), 83 { 84 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 85 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 86 "client_id": self.provider.client_id, 87 "client_assertion_type": "foo", 88 "client_assertion": "foo.bar", 89 }, 90 ) 91 self.assertEqual(response.status_code, 400) 92 body = loads(response.content.decode()) 93 self.assertEqual(body["error"], "invalid_grant") 94 95 def test_invalid_jwt(self): 96 """test invalid JWT""" 97 response = self.client.post( 98 reverse("authentik_providers_oauth2:token"), 99 { 100 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 101 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 102 "client_id": self.provider.client_id, 103 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 104 "client_assertion": "foo.bar", 105 }, 106 ) 107 self.assertEqual(response.status_code, 400) 108 body = loads(response.content.decode()) 109 self.assertEqual(body["error"], "invalid_grant") 110 111 def test_invalid_signature(self): 112 """test invalid JWT""" 113 token = self.helper_provider.encode( 114 { 115 "sub": "foo", 116 "exp": datetime.now() + timedelta(hours=2), 117 } 118 ) 119 response = self.client.post( 120 reverse("authentik_providers_oauth2:token"), 121 { 122 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 123 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 124 "client_id": self.provider.client_id, 125 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 126 "client_assertion": token + "foo", 127 }, 128 ) 129 self.assertEqual(response.status_code, 400) 130 body = loads(response.content.decode()) 131 self.assertEqual(body["error"], "invalid_grant") 132 133 def test_invalid_expired(self): 134 """test invalid JWT""" 135 token = self.helper_provider.encode( 136 { 137 "sub": "foo", 138 "exp": datetime.now() - timedelta(hours=2), 139 } 140 ) 141 response = self.client.post( 142 reverse("authentik_providers_oauth2:token"), 143 { 144 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 145 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 146 "client_id": self.provider.client_id, 147 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 148 "client_assertion": token, 149 }, 150 ) 151 self.assertEqual(response.status_code, 400) 152 body = loads(response.content.decode()) 153 self.assertEqual(body["error"], "invalid_grant") 154 155 def test_invalid_no_app(self): 156 """test invalid JWT""" 157 self.app.provider = None 158 self.app.save() 159 token = self.helper_provider.encode( 160 { 161 "sub": "foo", 162 "exp": datetime.now() + timedelta(hours=2), 163 } 164 ) 165 response = self.client.post( 166 reverse("authentik_providers_oauth2:token"), 167 { 168 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 169 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 170 "client_id": self.provider.client_id, 171 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 172 "client_assertion": token, 173 }, 174 ) 175 self.assertEqual(response.status_code, 400) 176 body = loads(response.content.decode()) 177 self.assertEqual(body["error"], "invalid_grant") 178 179 def test_invalid_access_denied(self): 180 """test invalid JWT""" 181 group = Group.objects.create(name="foo") 182 PolicyBinding.objects.create( 183 group=group, 184 target=self.app, 185 order=0, 186 ) 187 token = self.helper_provider.encode( 188 { 189 "sub": "foo", 190 "exp": datetime.now() + timedelta(hours=2), 191 } 192 ) 193 response = self.client.post( 194 reverse("authentik_providers_oauth2:token"), 195 { 196 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 197 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 198 "client_id": self.provider.client_id, 199 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 200 "client_assertion": token, 201 }, 202 ) 203 self.assertEqual(response.status_code, 400) 204 body = loads(response.content.decode()) 205 self.assertEqual(body["error"], "invalid_grant") 206 207 def test_successful(self): 208 """test successful""" 209 token = self.helper_provider.encode( 210 { 211 "sub": "foo", 212 "exp": datetime.now() + timedelta(hours=2), 213 } 214 ) 215 response = self.client.post( 216 reverse("authentik_providers_oauth2:token"), 217 { 218 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 219 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 220 "client_id": self.provider.client_id, 221 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 222 "client_assertion": token, 223 }, 224 ) 225 self.assertEqual(response.status_code, 200) 226 227 user = User.objects.filter(username=f"{self.provider.name}-foo").first() 228 self.assertIsNotNone(user) 229 230 body = loads(response.content.decode()) 231 self.assertEqual(body["token_type"], TOKEN_TYPE) 232 _, alg = self.provider.jwt_key 233 jwt = decode( 234 body["access_token"], 235 key=self.provider.signing_key.public_key, 236 algorithms=[alg], 237 audience=self.provider.client_id, 238 ) 239 self.assertEqual( 240 jwt["given_name"], "Autogenerated user from application test (client credentials JWT)" 241 ) 242 self.assertEqual(jwt["preferred_username"], "test-foo") 243 244 def test_successful_mapping(self): 245 """test successful""" 246 test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH] 247 mapping = OAuthSourcePropertyMapping.objects.create( 248 name="test-mapping", 249 expression="""return { 250 "email": oauth_userinfo.get("email"), 251 "name": oauth_userinfo.get("name"), 252 "username": oauth_userinfo.get("username"), 253 }""", 254 ) 255 self.source.user_property_mappings.add(mapping) 256 257 token = self.helper_provider.encode( 258 { 259 "sub": "foo", 260 "email": "test-user@example.com", 261 "name": "Mapped Test User", 262 "username": "mapped-foo" + ("a" * 150), 263 "exp": datetime.now() + timedelta(hours=2), 264 } 265 ) 266 response = self.client.post( 267 reverse("authentik_providers_oauth2:token"), 268 { 269 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 270 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 271 "client_id": self.provider.client_id, 272 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 273 "client_assertion": token, 274 }, 275 ) 276 self.assertEqual(response.status_code, 200) 277 278 user = User.objects.filter(username=test_username).first() 279 self.assertIsNotNone(user) 280 281 body = loads(response.content.decode()) 282 self.assertEqual(body["token_type"], TOKEN_TYPE) 283 key_obj, alg = self.provider.jwt_key 284 jwt = decode( 285 body["access_token"], 286 key=key_obj.public_key(), 287 algorithms=[alg], 288 audience=self.provider.client_id, 289 ) 290 291 self.assertEqual(jwt["email"], "test-user@example.com") 292 self.assertEqual(jwt["given_name"], "Mapped Test User") 293 self.assertEqual(jwt["preferred_username"], test_username)
Test token (client_credentials, with JWT) view
@apply_blueprint('system/providers-oauth2.yaml')
def
setUp(self) -> None:
38 @apply_blueprint("system/providers-oauth2.yaml") 39 def setUp(self) -> None: 40 super().setUp() 41 self.factory = RequestFactory() 42 self.other_cert = create_test_cert() 43 # Provider used as a helper to sign JWTs with the same key as the OAuth source has 44 self.helper_provider = OAuth2Provider.objects.create( 45 name=generate_id(), 46 authorization_flow=create_test_flow(), 47 signing_key=self.other_cert, 48 ) 49 self.cert = create_test_cert() 50 51 jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig") 52 self.source: OAuthSource = OAuthSource.objects.create( 53 name=generate_id(), 54 slug=generate_id(), 55 provider_type="openidconnect", 56 consumer_key=generate_id(), 57 consumer_secret=generate_id(), 58 authorization_url="http://foo", 59 access_token_url=f"http://{generate_id()}", 60 profile_url="http://foo", 61 oidc_well_known_url="", 62 oidc_jwks_url="", 63 oidc_jwks={ 64 "keys": [jwk], 65 }, 66 ) 67 68 self.provider: OAuth2Provider = OAuth2Provider.objects.create( 69 name="test", 70 authorization_flow=create_test_flow(), 71 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 72 signing_key=self.cert, 73 grant_types=[GrantType.CLIENT_CREDENTIALS], 74 ) 75 self.provider.jwt_federation_sources.add(self.source) 76 self.provider.property_mappings.set(ScopeMapping.objects.all()) 77 self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
Hook method for setting up the test fixture before exercising it.
def
test_invalid_type(self):
79 def test_invalid_type(self): 80 """test invalid type""" 81 response = self.client.post( 82 reverse("authentik_providers_oauth2:token"), 83 { 84 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 85 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 86 "client_id": self.provider.client_id, 87 "client_assertion_type": "foo", 88 "client_assertion": "foo.bar", 89 }, 90 ) 91 self.assertEqual(response.status_code, 400) 92 body = loads(response.content.decode()) 93 self.assertEqual(body["error"], "invalid_grant")
test invalid type
def
test_invalid_jwt(self):
95 def test_invalid_jwt(self): 96 """test invalid JWT""" 97 response = self.client.post( 98 reverse("authentik_providers_oauth2:token"), 99 { 100 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 101 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 102 "client_id": self.provider.client_id, 103 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 104 "client_assertion": "foo.bar", 105 }, 106 ) 107 self.assertEqual(response.status_code, 400) 108 body = loads(response.content.decode()) 109 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_signature(self):
111 def test_invalid_signature(self): 112 """test invalid JWT""" 113 token = self.helper_provider.encode( 114 { 115 "sub": "foo", 116 "exp": datetime.now() + timedelta(hours=2), 117 } 118 ) 119 response = self.client.post( 120 reverse("authentik_providers_oauth2:token"), 121 { 122 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 123 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 124 "client_id": self.provider.client_id, 125 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 126 "client_assertion": token + "foo", 127 }, 128 ) 129 self.assertEqual(response.status_code, 400) 130 body = loads(response.content.decode()) 131 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_expired(self):
133 def test_invalid_expired(self): 134 """test invalid JWT""" 135 token = self.helper_provider.encode( 136 { 137 "sub": "foo", 138 "exp": datetime.now() - timedelta(hours=2), 139 } 140 ) 141 response = self.client.post( 142 reverse("authentik_providers_oauth2:token"), 143 { 144 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 145 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 146 "client_id": self.provider.client_id, 147 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 148 "client_assertion": token, 149 }, 150 ) 151 self.assertEqual(response.status_code, 400) 152 body = loads(response.content.decode()) 153 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_no_app(self):
155 def test_invalid_no_app(self): 156 """test invalid JWT""" 157 self.app.provider = None 158 self.app.save() 159 token = self.helper_provider.encode( 160 { 161 "sub": "foo", 162 "exp": datetime.now() + timedelta(hours=2), 163 } 164 ) 165 response = self.client.post( 166 reverse("authentik_providers_oauth2:token"), 167 { 168 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 169 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 170 "client_id": self.provider.client_id, 171 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 172 "client_assertion": token, 173 }, 174 ) 175 self.assertEqual(response.status_code, 400) 176 body = loads(response.content.decode()) 177 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_access_denied(self):
179 def test_invalid_access_denied(self): 180 """test invalid JWT""" 181 group = Group.objects.create(name="foo") 182 PolicyBinding.objects.create( 183 group=group, 184 target=self.app, 185 order=0, 186 ) 187 token = self.helper_provider.encode( 188 { 189 "sub": "foo", 190 "exp": datetime.now() + timedelta(hours=2), 191 } 192 ) 193 response = self.client.post( 194 reverse("authentik_providers_oauth2:token"), 195 { 196 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 197 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 198 "client_id": self.provider.client_id, 199 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 200 "client_assertion": token, 201 }, 202 ) 203 self.assertEqual(response.status_code, 400) 204 body = loads(response.content.decode()) 205 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_successful(self):
207 def test_successful(self): 208 """test successful""" 209 token = self.helper_provider.encode( 210 { 211 "sub": "foo", 212 "exp": datetime.now() + timedelta(hours=2), 213 } 214 ) 215 response = self.client.post( 216 reverse("authentik_providers_oauth2:token"), 217 { 218 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 219 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 220 "client_id": self.provider.client_id, 221 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 222 "client_assertion": token, 223 }, 224 ) 225 self.assertEqual(response.status_code, 200) 226 227 user = User.objects.filter(username=f"{self.provider.name}-foo").first() 228 self.assertIsNotNone(user) 229 230 body = loads(response.content.decode()) 231 self.assertEqual(body["token_type"], TOKEN_TYPE) 232 _, alg = self.provider.jwt_key 233 jwt = decode( 234 body["access_token"], 235 key=self.provider.signing_key.public_key, 236 algorithms=[alg], 237 audience=self.provider.client_id, 238 ) 239 self.assertEqual( 240 jwt["given_name"], "Autogenerated user from application test (client credentials JWT)" 241 ) 242 self.assertEqual(jwt["preferred_username"], "test-foo")
test successful
def
test_successful_mapping(self):
244 def test_successful_mapping(self): 245 """test successful""" 246 test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH] 247 mapping = OAuthSourcePropertyMapping.objects.create( 248 name="test-mapping", 249 expression="""return { 250 "email": oauth_userinfo.get("email"), 251 "name": oauth_userinfo.get("name"), 252 "username": oauth_userinfo.get("username"), 253 }""", 254 ) 255 self.source.user_property_mappings.add(mapping) 256 257 token = self.helper_provider.encode( 258 { 259 "sub": "foo", 260 "email": "test-user@example.com", 261 "name": "Mapped Test User", 262 "username": "mapped-foo" + ("a" * 150), 263 "exp": datetime.now() + timedelta(hours=2), 264 } 265 ) 266 response = self.client.post( 267 reverse("authentik_providers_oauth2:token"), 268 { 269 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 270 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 271 "client_id": self.provider.client_id, 272 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 273 "client_assertion": token, 274 }, 275 ) 276 self.assertEqual(response.status_code, 200) 277 278 user = User.objects.filter(username=test_username).first() 279 self.assertIsNotNone(user) 280 281 body = loads(response.content.decode()) 282 self.assertEqual(body["token_type"], TOKEN_TYPE) 283 key_obj, alg = self.provider.jwt_key 284 jwt = decode( 285 body["access_token"], 286 key=key_obj.public_key(), 287 algorithms=[alg], 288 audience=self.provider.client_id, 289 ) 290 291 self.assertEqual(jwt["email"], "test-user@example.com") 292 self.assertEqual(jwt["given_name"], "Mapped Test User") 293 self.assertEqual(jwt["preferred_username"], test_username)
test successful