authentik.providers.oauth2.tests.test_token_cc_jwt_source
Test token view
1"""Test token view""" 2 3from datetime import datetime, timedelta 4from json import loads 5 6from django.test import RequestFactory 7from django.urls import reverse 8from jwt import decode 9 10from authentik.blueprints.tests import apply_blueprint 11from authentik.common.oauth.constants import ( 12 GRANT_TYPE_CLIENT_CREDENTIALS, 13 SCOPE_OPENID, 14 SCOPE_OPENID_EMAIL, 15 SCOPE_OPENID_PROFILE, 16 TOKEN_TYPE, 17) 18from authentik.core.models import USERNAME_MAX_LENGTH, Application, Group, User 19from authentik.core.tests.utils import create_test_cert, create_test_flow 20from authentik.lib.generators import generate_id 21from authentik.policies.models import PolicyBinding 22from authentik.providers.oauth2.models import ( 23 OAuth2Provider, 24 RedirectURI, 25 RedirectURIMatchingMode, 26 ScopeMapping, 27) 28from authentik.providers.oauth2.tests.utils import OAuthTestCase 29from authentik.providers.oauth2.views.jwks import JWKSView 30from authentik.sources.oauth.models import OAuthSource, OAuthSourcePropertyMapping 31 32 33class TestTokenClientCredentialsJWTSource(OAuthTestCase): 34 """Test token (client_credentials, with JWT) view""" 35 36 @apply_blueprint("system/providers-oauth2.yaml") 37 def setUp(self) -> None: 38 super().setUp() 39 self.factory = RequestFactory() 40 self.other_cert = create_test_cert() 41 # Provider used as a helper to sign JWTs with the same key as the OAuth source has 42 self.helper_provider = OAuth2Provider.objects.create( 43 name=generate_id(), 44 authorization_flow=create_test_flow(), 45 signing_key=self.other_cert, 46 ) 47 self.cert = create_test_cert() 48 49 jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig") 50 self.source: OAuthSource = OAuthSource.objects.create( 51 name=generate_id(), 52 slug=generate_id(), 53 provider_type="openidconnect", 54 consumer_key=generate_id(), 55 consumer_secret=generate_id(), 56 authorization_url="http://foo", 57 access_token_url=f"http://{generate_id()}", 58 profile_url="http://foo", 59 oidc_well_known_url="", 60 oidc_jwks_url="", 61 oidc_jwks={ 62 "keys": [jwk], 63 }, 64 ) 65 66 self.provider: OAuth2Provider = OAuth2Provider.objects.create( 67 name="test", 68 authorization_flow=create_test_flow(), 69 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 70 signing_key=self.cert, 71 ) 72 self.provider.jwt_federation_sources.add(self.source) 73 self.provider.property_mappings.set(ScopeMapping.objects.all()) 74 self.app = Application.objects.create(name="test", slug="test", provider=self.provider) 75 76 def test_invalid_type(self): 77 """test invalid type""" 78 response = self.client.post( 79 reverse("authentik_providers_oauth2:token"), 80 { 81 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 82 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 83 "client_id": self.provider.client_id, 84 "client_assertion_type": "foo", 85 "client_assertion": "foo.bar", 86 }, 87 ) 88 self.assertEqual(response.status_code, 400) 89 body = loads(response.content.decode()) 90 self.assertEqual(body["error"], "invalid_grant") 91 92 def test_invalid_jwt(self): 93 """test invalid JWT""" 94 response = self.client.post( 95 reverse("authentik_providers_oauth2:token"), 96 { 97 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 98 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 99 "client_id": self.provider.client_id, 100 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 101 "client_assertion": "foo.bar", 102 }, 103 ) 104 self.assertEqual(response.status_code, 400) 105 body = loads(response.content.decode()) 106 self.assertEqual(body["error"], "invalid_grant") 107 108 def test_invalid_signature(self): 109 """test invalid JWT""" 110 token = self.helper_provider.encode( 111 { 112 "sub": "foo", 113 "exp": datetime.now() + timedelta(hours=2), 114 } 115 ) 116 response = self.client.post( 117 reverse("authentik_providers_oauth2:token"), 118 { 119 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 120 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 121 "client_id": self.provider.client_id, 122 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 123 "client_assertion": token + "foo", 124 }, 125 ) 126 self.assertEqual(response.status_code, 400) 127 body = loads(response.content.decode()) 128 self.assertEqual(body["error"], "invalid_grant") 129 130 def test_invalid_expired(self): 131 """test invalid JWT""" 132 token = self.helper_provider.encode( 133 { 134 "sub": "foo", 135 "exp": datetime.now() - timedelta(hours=2), 136 } 137 ) 138 response = self.client.post( 139 reverse("authentik_providers_oauth2:token"), 140 { 141 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 142 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 143 "client_id": self.provider.client_id, 144 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 145 "client_assertion": token, 146 }, 147 ) 148 self.assertEqual(response.status_code, 400) 149 body = loads(response.content.decode()) 150 self.assertEqual(body["error"], "invalid_grant") 151 152 def test_invalid_no_app(self): 153 """test invalid JWT""" 154 self.app.provider = None 155 self.app.save() 156 token = self.helper_provider.encode( 157 { 158 "sub": "foo", 159 "exp": datetime.now() + timedelta(hours=2), 160 } 161 ) 162 response = self.client.post( 163 reverse("authentik_providers_oauth2:token"), 164 { 165 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 166 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 167 "client_id": self.provider.client_id, 168 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 169 "client_assertion": token, 170 }, 171 ) 172 self.assertEqual(response.status_code, 400) 173 body = loads(response.content.decode()) 174 self.assertEqual(body["error"], "invalid_grant") 175 176 def test_invalid_access_denied(self): 177 """test invalid JWT""" 178 group = Group.objects.create(name="foo") 179 PolicyBinding.objects.create( 180 group=group, 181 target=self.app, 182 order=0, 183 ) 184 token = self.helper_provider.encode( 185 { 186 "sub": "foo", 187 "exp": datetime.now() + timedelta(hours=2), 188 } 189 ) 190 response = self.client.post( 191 reverse("authentik_providers_oauth2:token"), 192 { 193 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 194 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 195 "client_id": self.provider.client_id, 196 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 197 "client_assertion": token, 198 }, 199 ) 200 self.assertEqual(response.status_code, 400) 201 body = loads(response.content.decode()) 202 self.assertEqual(body["error"], "invalid_grant") 203 204 def test_successful(self): 205 """test successful""" 206 token = self.helper_provider.encode( 207 { 208 "sub": "foo", 209 "exp": datetime.now() + timedelta(hours=2), 210 } 211 ) 212 response = self.client.post( 213 reverse("authentik_providers_oauth2:token"), 214 { 215 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 216 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 217 "client_id": self.provider.client_id, 218 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 219 "client_assertion": token, 220 }, 221 ) 222 self.assertEqual(response.status_code, 200) 223 224 user = User.objects.filter(username=f"{self.provider.name}-foo").first() 225 self.assertIsNotNone(user) 226 227 body = loads(response.content.decode()) 228 self.assertEqual(body["token_type"], TOKEN_TYPE) 229 _, alg = self.provider.jwt_key 230 jwt = decode( 231 body["access_token"], 232 key=self.provider.signing_key.public_key, 233 algorithms=[alg], 234 audience=self.provider.client_id, 235 ) 236 self.assertEqual( 237 jwt["given_name"], "Autogenerated user from application test (client credentials JWT)" 238 ) 239 self.assertEqual(jwt["preferred_username"], "test-foo") 240 241 def test_successful_mapping(self): 242 """test successful""" 243 test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH] 244 mapping = OAuthSourcePropertyMapping.objects.create( 245 name="test-mapping", 246 expression="""return { 247 "email": oauth_userinfo.get("email"), 248 "name": oauth_userinfo.get("name"), 249 "username": oauth_userinfo.get("username"), 250 }""", 251 ) 252 self.source.user_property_mappings.add(mapping) 253 254 token = self.helper_provider.encode( 255 { 256 "sub": "foo", 257 "email": "test-user@example.com", 258 "name": "Mapped Test User", 259 "username": "mapped-foo" + ("a" * 150), 260 "exp": datetime.now() + timedelta(hours=2), 261 } 262 ) 263 response = self.client.post( 264 reverse("authentik_providers_oauth2:token"), 265 { 266 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 267 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 268 "client_id": self.provider.client_id, 269 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 270 "client_assertion": token, 271 }, 272 ) 273 self.assertEqual(response.status_code, 200) 274 275 user = User.objects.filter(username=test_username).first() 276 self.assertIsNotNone(user) 277 278 body = loads(response.content.decode()) 279 self.assertEqual(body["token_type"], TOKEN_TYPE) 280 key_obj, alg = self.provider.jwt_key 281 jwt = decode( 282 body["access_token"], 283 key=key_obj.public_key(), 284 algorithms=[alg], 285 audience=self.provider.client_id, 286 ) 287 288 self.assertEqual(jwt["email"], "test-user@example.com") 289 self.assertEqual(jwt["given_name"], "Mapped Test User") 290 self.assertEqual(jwt["preferred_username"], test_username)
34class TestTokenClientCredentialsJWTSource(OAuthTestCase): 35 """Test token (client_credentials, with JWT) view""" 36 37 @apply_blueprint("system/providers-oauth2.yaml") 38 def setUp(self) -> None: 39 super().setUp() 40 self.factory = RequestFactory() 41 self.other_cert = create_test_cert() 42 # Provider used as a helper to sign JWTs with the same key as the OAuth source has 43 self.helper_provider = OAuth2Provider.objects.create( 44 name=generate_id(), 45 authorization_flow=create_test_flow(), 46 signing_key=self.other_cert, 47 ) 48 self.cert = create_test_cert() 49 50 jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig") 51 self.source: OAuthSource = OAuthSource.objects.create( 52 name=generate_id(), 53 slug=generate_id(), 54 provider_type="openidconnect", 55 consumer_key=generate_id(), 56 consumer_secret=generate_id(), 57 authorization_url="http://foo", 58 access_token_url=f"http://{generate_id()}", 59 profile_url="http://foo", 60 oidc_well_known_url="", 61 oidc_jwks_url="", 62 oidc_jwks={ 63 "keys": [jwk], 64 }, 65 ) 66 67 self.provider: OAuth2Provider = OAuth2Provider.objects.create( 68 name="test", 69 authorization_flow=create_test_flow(), 70 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 71 signing_key=self.cert, 72 ) 73 self.provider.jwt_federation_sources.add(self.source) 74 self.provider.property_mappings.set(ScopeMapping.objects.all()) 75 self.app = Application.objects.create(name="test", slug="test", provider=self.provider) 76 77 def test_invalid_type(self): 78 """test invalid type""" 79 response = self.client.post( 80 reverse("authentik_providers_oauth2:token"), 81 { 82 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 83 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 84 "client_id": self.provider.client_id, 85 "client_assertion_type": "foo", 86 "client_assertion": "foo.bar", 87 }, 88 ) 89 self.assertEqual(response.status_code, 400) 90 body = loads(response.content.decode()) 91 self.assertEqual(body["error"], "invalid_grant") 92 93 def test_invalid_jwt(self): 94 """test invalid JWT""" 95 response = self.client.post( 96 reverse("authentik_providers_oauth2:token"), 97 { 98 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 99 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 100 "client_id": self.provider.client_id, 101 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 102 "client_assertion": "foo.bar", 103 }, 104 ) 105 self.assertEqual(response.status_code, 400) 106 body = loads(response.content.decode()) 107 self.assertEqual(body["error"], "invalid_grant") 108 109 def test_invalid_signature(self): 110 """test invalid JWT""" 111 token = self.helper_provider.encode( 112 { 113 "sub": "foo", 114 "exp": datetime.now() + timedelta(hours=2), 115 } 116 ) 117 response = self.client.post( 118 reverse("authentik_providers_oauth2:token"), 119 { 120 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 121 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 122 "client_id": self.provider.client_id, 123 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 124 "client_assertion": token + "foo", 125 }, 126 ) 127 self.assertEqual(response.status_code, 400) 128 body = loads(response.content.decode()) 129 self.assertEqual(body["error"], "invalid_grant") 130 131 def test_invalid_expired(self): 132 """test invalid JWT""" 133 token = self.helper_provider.encode( 134 { 135 "sub": "foo", 136 "exp": datetime.now() - timedelta(hours=2), 137 } 138 ) 139 response = self.client.post( 140 reverse("authentik_providers_oauth2:token"), 141 { 142 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 143 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 144 "client_id": self.provider.client_id, 145 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 146 "client_assertion": token, 147 }, 148 ) 149 self.assertEqual(response.status_code, 400) 150 body = loads(response.content.decode()) 151 self.assertEqual(body["error"], "invalid_grant") 152 153 def test_invalid_no_app(self): 154 """test invalid JWT""" 155 self.app.provider = None 156 self.app.save() 157 token = self.helper_provider.encode( 158 { 159 "sub": "foo", 160 "exp": datetime.now() + timedelta(hours=2), 161 } 162 ) 163 response = self.client.post( 164 reverse("authentik_providers_oauth2:token"), 165 { 166 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 167 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 168 "client_id": self.provider.client_id, 169 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 170 "client_assertion": token, 171 }, 172 ) 173 self.assertEqual(response.status_code, 400) 174 body = loads(response.content.decode()) 175 self.assertEqual(body["error"], "invalid_grant") 176 177 def test_invalid_access_denied(self): 178 """test invalid JWT""" 179 group = Group.objects.create(name="foo") 180 PolicyBinding.objects.create( 181 group=group, 182 target=self.app, 183 order=0, 184 ) 185 token = self.helper_provider.encode( 186 { 187 "sub": "foo", 188 "exp": datetime.now() + timedelta(hours=2), 189 } 190 ) 191 response = self.client.post( 192 reverse("authentik_providers_oauth2:token"), 193 { 194 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 195 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 196 "client_id": self.provider.client_id, 197 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 198 "client_assertion": token, 199 }, 200 ) 201 self.assertEqual(response.status_code, 400) 202 body = loads(response.content.decode()) 203 self.assertEqual(body["error"], "invalid_grant") 204 205 def test_successful(self): 206 """test successful""" 207 token = self.helper_provider.encode( 208 { 209 "sub": "foo", 210 "exp": datetime.now() + timedelta(hours=2), 211 } 212 ) 213 response = self.client.post( 214 reverse("authentik_providers_oauth2:token"), 215 { 216 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 217 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 218 "client_id": self.provider.client_id, 219 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 220 "client_assertion": token, 221 }, 222 ) 223 self.assertEqual(response.status_code, 200) 224 225 user = User.objects.filter(username=f"{self.provider.name}-foo").first() 226 self.assertIsNotNone(user) 227 228 body = loads(response.content.decode()) 229 self.assertEqual(body["token_type"], TOKEN_TYPE) 230 _, alg = self.provider.jwt_key 231 jwt = decode( 232 body["access_token"], 233 key=self.provider.signing_key.public_key, 234 algorithms=[alg], 235 audience=self.provider.client_id, 236 ) 237 self.assertEqual( 238 jwt["given_name"], "Autogenerated user from application test (client credentials JWT)" 239 ) 240 self.assertEqual(jwt["preferred_username"], "test-foo") 241 242 def test_successful_mapping(self): 243 """test successful""" 244 test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH] 245 mapping = OAuthSourcePropertyMapping.objects.create( 246 name="test-mapping", 247 expression="""return { 248 "email": oauth_userinfo.get("email"), 249 "name": oauth_userinfo.get("name"), 250 "username": oauth_userinfo.get("username"), 251 }""", 252 ) 253 self.source.user_property_mappings.add(mapping) 254 255 token = self.helper_provider.encode( 256 { 257 "sub": "foo", 258 "email": "test-user@example.com", 259 "name": "Mapped Test User", 260 "username": "mapped-foo" + ("a" * 150), 261 "exp": datetime.now() + timedelta(hours=2), 262 } 263 ) 264 response = self.client.post( 265 reverse("authentik_providers_oauth2:token"), 266 { 267 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 268 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 269 "client_id": self.provider.client_id, 270 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 271 "client_assertion": token, 272 }, 273 ) 274 self.assertEqual(response.status_code, 200) 275 276 user = User.objects.filter(username=test_username).first() 277 self.assertIsNotNone(user) 278 279 body = loads(response.content.decode()) 280 self.assertEqual(body["token_type"], TOKEN_TYPE) 281 key_obj, alg = self.provider.jwt_key 282 jwt = decode( 283 body["access_token"], 284 key=key_obj.public_key(), 285 algorithms=[alg], 286 audience=self.provider.client_id, 287 ) 288 289 self.assertEqual(jwt["email"], "test-user@example.com") 290 self.assertEqual(jwt["given_name"], "Mapped Test User") 291 self.assertEqual(jwt["preferred_username"], test_username)
Test token (client_credentials, with JWT) view
@apply_blueprint('system/providers-oauth2.yaml')
def
setUp(self) -> None:
37 @apply_blueprint("system/providers-oauth2.yaml") 38 def setUp(self) -> None: 39 super().setUp() 40 self.factory = RequestFactory() 41 self.other_cert = create_test_cert() 42 # Provider used as a helper to sign JWTs with the same key as the OAuth source has 43 self.helper_provider = OAuth2Provider.objects.create( 44 name=generate_id(), 45 authorization_flow=create_test_flow(), 46 signing_key=self.other_cert, 47 ) 48 self.cert = create_test_cert() 49 50 jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig") 51 self.source: OAuthSource = OAuthSource.objects.create( 52 name=generate_id(), 53 slug=generate_id(), 54 provider_type="openidconnect", 55 consumer_key=generate_id(), 56 consumer_secret=generate_id(), 57 authorization_url="http://foo", 58 access_token_url=f"http://{generate_id()}", 59 profile_url="http://foo", 60 oidc_well_known_url="", 61 oidc_jwks_url="", 62 oidc_jwks={ 63 "keys": [jwk], 64 }, 65 ) 66 67 self.provider: OAuth2Provider = OAuth2Provider.objects.create( 68 name="test", 69 authorization_flow=create_test_flow(), 70 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 71 signing_key=self.cert, 72 ) 73 self.provider.jwt_federation_sources.add(self.source) 74 self.provider.property_mappings.set(ScopeMapping.objects.all()) 75 self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
Hook method for setting up the test fixture before exercising it.
def
test_invalid_type(self):
77 def test_invalid_type(self): 78 """test invalid type""" 79 response = self.client.post( 80 reverse("authentik_providers_oauth2:token"), 81 { 82 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 83 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 84 "client_id": self.provider.client_id, 85 "client_assertion_type": "foo", 86 "client_assertion": "foo.bar", 87 }, 88 ) 89 self.assertEqual(response.status_code, 400) 90 body = loads(response.content.decode()) 91 self.assertEqual(body["error"], "invalid_grant")
test invalid type
def
test_invalid_jwt(self):
93 def test_invalid_jwt(self): 94 """test invalid JWT""" 95 response = self.client.post( 96 reverse("authentik_providers_oauth2:token"), 97 { 98 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 99 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 100 "client_id": self.provider.client_id, 101 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 102 "client_assertion": "foo.bar", 103 }, 104 ) 105 self.assertEqual(response.status_code, 400) 106 body = loads(response.content.decode()) 107 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_signature(self):
109 def test_invalid_signature(self): 110 """test invalid JWT""" 111 token = self.helper_provider.encode( 112 { 113 "sub": "foo", 114 "exp": datetime.now() + timedelta(hours=2), 115 } 116 ) 117 response = self.client.post( 118 reverse("authentik_providers_oauth2:token"), 119 { 120 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 121 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 122 "client_id": self.provider.client_id, 123 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 124 "client_assertion": token + "foo", 125 }, 126 ) 127 self.assertEqual(response.status_code, 400) 128 body = loads(response.content.decode()) 129 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_expired(self):
131 def test_invalid_expired(self): 132 """test invalid JWT""" 133 token = self.helper_provider.encode( 134 { 135 "sub": "foo", 136 "exp": datetime.now() - timedelta(hours=2), 137 } 138 ) 139 response = self.client.post( 140 reverse("authentik_providers_oauth2:token"), 141 { 142 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 143 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 144 "client_id": self.provider.client_id, 145 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 146 "client_assertion": token, 147 }, 148 ) 149 self.assertEqual(response.status_code, 400) 150 body = loads(response.content.decode()) 151 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_no_app(self):
153 def test_invalid_no_app(self): 154 """test invalid JWT""" 155 self.app.provider = None 156 self.app.save() 157 token = self.helper_provider.encode( 158 { 159 "sub": "foo", 160 "exp": datetime.now() + timedelta(hours=2), 161 } 162 ) 163 response = self.client.post( 164 reverse("authentik_providers_oauth2:token"), 165 { 166 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 167 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 168 "client_id": self.provider.client_id, 169 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 170 "client_assertion": token, 171 }, 172 ) 173 self.assertEqual(response.status_code, 400) 174 body = loads(response.content.decode()) 175 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_access_denied(self):
177 def test_invalid_access_denied(self): 178 """test invalid JWT""" 179 group = Group.objects.create(name="foo") 180 PolicyBinding.objects.create( 181 group=group, 182 target=self.app, 183 order=0, 184 ) 185 token = self.helper_provider.encode( 186 { 187 "sub": "foo", 188 "exp": datetime.now() + timedelta(hours=2), 189 } 190 ) 191 response = self.client.post( 192 reverse("authentik_providers_oauth2:token"), 193 { 194 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 195 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 196 "client_id": self.provider.client_id, 197 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 198 "client_assertion": token, 199 }, 200 ) 201 self.assertEqual(response.status_code, 400) 202 body = loads(response.content.decode()) 203 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_successful(self):
205 def test_successful(self): 206 """test successful""" 207 token = self.helper_provider.encode( 208 { 209 "sub": "foo", 210 "exp": datetime.now() + timedelta(hours=2), 211 } 212 ) 213 response = self.client.post( 214 reverse("authentik_providers_oauth2:token"), 215 { 216 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 217 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 218 "client_id": self.provider.client_id, 219 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 220 "client_assertion": token, 221 }, 222 ) 223 self.assertEqual(response.status_code, 200) 224 225 user = User.objects.filter(username=f"{self.provider.name}-foo").first() 226 self.assertIsNotNone(user) 227 228 body = loads(response.content.decode()) 229 self.assertEqual(body["token_type"], TOKEN_TYPE) 230 _, alg = self.provider.jwt_key 231 jwt = decode( 232 body["access_token"], 233 key=self.provider.signing_key.public_key, 234 algorithms=[alg], 235 audience=self.provider.client_id, 236 ) 237 self.assertEqual( 238 jwt["given_name"], "Autogenerated user from application test (client credentials JWT)" 239 ) 240 self.assertEqual(jwt["preferred_username"], "test-foo")
test successful
def
test_successful_mapping(self):
242 def test_successful_mapping(self): 243 """test successful""" 244 test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH] 245 mapping = OAuthSourcePropertyMapping.objects.create( 246 name="test-mapping", 247 expression="""return { 248 "email": oauth_userinfo.get("email"), 249 "name": oauth_userinfo.get("name"), 250 "username": oauth_userinfo.get("username"), 251 }""", 252 ) 253 self.source.user_property_mappings.add(mapping) 254 255 token = self.helper_provider.encode( 256 { 257 "sub": "foo", 258 "email": "test-user@example.com", 259 "name": "Mapped Test User", 260 "username": "mapped-foo" + ("a" * 150), 261 "exp": datetime.now() + timedelta(hours=2), 262 } 263 ) 264 response = self.client.post( 265 reverse("authentik_providers_oauth2:token"), 266 { 267 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 268 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 269 "client_id": self.provider.client_id, 270 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 271 "client_assertion": token, 272 }, 273 ) 274 self.assertEqual(response.status_code, 200) 275 276 user = User.objects.filter(username=test_username).first() 277 self.assertIsNotNone(user) 278 279 body = loads(response.content.decode()) 280 self.assertEqual(body["token_type"], TOKEN_TYPE) 281 key_obj, alg = self.provider.jwt_key 282 jwt = decode( 283 body["access_token"], 284 key=key_obj.public_key(), 285 algorithms=[alg], 286 audience=self.provider.client_id, 287 ) 288 289 self.assertEqual(jwt["email"], "test-user@example.com") 290 self.assertEqual(jwt["given_name"], "Mapped Test User") 291 self.assertEqual(jwt["preferred_username"], test_username)
test successful