authentik.providers.oauth2.tests.test_token_cc_jwt_provider
Test token view
1"""Test token view""" 2 3from datetime import datetime, timedelta 4from json import loads 5 6from django.test import RequestFactory 7from django.urls import reverse 8from django.utils.timezone import now 9from jwt import decode 10 11from authentik.blueprints.tests import apply_blueprint 12from authentik.common.oauth.constants import ( 13 GRANT_TYPE_CLIENT_CREDENTIALS, 14 SCOPE_OPENID, 15 SCOPE_OPENID_EMAIL, 16 SCOPE_OPENID_PROFILE, 17 TOKEN_TYPE, 18) 19from authentik.core.models import Application, Group 20from authentik.core.tests.utils import create_test_cert, create_test_flow, create_test_user 21from authentik.lib.generators import generate_id 22from authentik.policies.models import PolicyBinding 23from authentik.providers.oauth2.models import ( 24 AccessToken, 25 OAuth2Provider, 26 RedirectURI, 27 RedirectURIMatchingMode, 28 ScopeMapping, 29) 30from authentik.providers.oauth2.tests.utils import OAuthTestCase 31 32 33class TestTokenClientCredentialsJWTProvider(OAuthTestCase): 34 """Test token (client_credentials, with JWT) view""" 35 36 @apply_blueprint("system/providers-oauth2.yaml") 37 def setUp(self) -> None: 38 super().setUp() 39 self.factory = RequestFactory() 40 self.other_cert = create_test_cert() 41 self.cert = create_test_cert() 42 43 self.other_provider = OAuth2Provider.objects.create( 44 name=generate_id(), 45 authorization_flow=create_test_flow(), 46 signing_key=self.other_cert, 47 ) 48 self.other_provider.property_mappings.set(ScopeMapping.objects.all()) 49 self.app = Application.objects.create( 50 name=generate_id(), slug=generate_id(), provider=self.other_provider 51 ) 52 53 self.provider: OAuth2Provider = OAuth2Provider.objects.create( 54 name="test", 55 authorization_flow=create_test_flow(), 56 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 57 signing_key=self.cert, 58 ) 59 self.provider.jwt_federation_providers.add(self.other_provider) 60 self.provider.property_mappings.set(ScopeMapping.objects.all()) 61 self.app = Application.objects.create(name="test", slug="test", provider=self.provider) 62 63 def test_invalid_type(self): 64 """test invalid type""" 65 response = self.client.post( 66 reverse("authentik_providers_oauth2:token"), 67 { 68 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 69 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 70 "client_id": self.provider.client_id, 71 "client_assertion_type": "foo", 72 "client_assertion": "foo.bar", 73 }, 74 ) 75 self.assertEqual(response.status_code, 400) 76 body = loads(response.content.decode()) 77 self.assertEqual(body["error"], "invalid_grant") 78 79 def test_invalid_jwt(self): 80 """test invalid JWT""" 81 response = self.client.post( 82 reverse("authentik_providers_oauth2:token"), 83 { 84 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 85 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 86 "client_id": self.provider.client_id, 87 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 88 "client_assertion": "foo.bar", 89 }, 90 ) 91 self.assertEqual(response.status_code, 400) 92 body = loads(response.content.decode()) 93 self.assertEqual(body["error"], "invalid_grant") 94 95 def test_invalid_signature(self): 96 """test invalid JWT""" 97 token = self.provider.encode( 98 { 99 "sub": "foo", 100 "exp": datetime.now() + timedelta(hours=2), 101 } 102 ) 103 response = self.client.post( 104 reverse("authentik_providers_oauth2:token"), 105 { 106 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 107 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 108 "client_id": self.provider.client_id, 109 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 110 "client_assertion": token + "foo", 111 }, 112 ) 113 self.assertEqual(response.status_code, 400) 114 body = loads(response.content.decode()) 115 self.assertEqual(body["error"], "invalid_grant") 116 117 def test_invalid_expired(self): 118 """test invalid JWT""" 119 token = self.provider.encode( 120 { 121 "sub": "foo", 122 "exp": datetime.now() - timedelta(hours=2), 123 } 124 ) 125 response = self.client.post( 126 reverse("authentik_providers_oauth2:token"), 127 { 128 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 129 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 130 "client_id": self.provider.client_id, 131 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 132 "client_assertion": token, 133 }, 134 ) 135 self.assertEqual(response.status_code, 400) 136 body = loads(response.content.decode()) 137 self.assertEqual(body["error"], "invalid_grant") 138 139 def test_invalid_no_app(self): 140 """test invalid JWT""" 141 self.app.provider = None 142 self.app.save() 143 token = self.provider.encode( 144 { 145 "sub": "foo", 146 "exp": datetime.now() + timedelta(hours=2), 147 } 148 ) 149 response = self.client.post( 150 reverse("authentik_providers_oauth2:token"), 151 { 152 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 153 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 154 "client_id": self.provider.client_id, 155 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 156 "client_assertion": token, 157 }, 158 ) 159 self.assertEqual(response.status_code, 400) 160 body = loads(response.content.decode()) 161 self.assertEqual(body["error"], "invalid_grant") 162 163 def test_invalid_access_denied(self): 164 """test invalid JWT""" 165 group = Group.objects.create(name="foo") 166 PolicyBinding.objects.create( 167 group=group, 168 target=self.app, 169 order=0, 170 ) 171 token = self.provider.encode( 172 { 173 "sub": "foo", 174 "exp": datetime.now() + timedelta(hours=2), 175 } 176 ) 177 response = self.client.post( 178 reverse("authentik_providers_oauth2:token"), 179 { 180 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 181 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 182 "client_id": self.provider.client_id, 183 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 184 "client_assertion": token, 185 }, 186 ) 187 self.assertEqual(response.status_code, 400) 188 body = loads(response.content.decode()) 189 self.assertEqual(body["error"], "invalid_grant") 190 191 def test_successful(self): 192 """test successful""" 193 user = create_test_user() 194 token = self.other_provider.encode( 195 { 196 "sub": "foo", 197 "exp": datetime.now() + timedelta(hours=2), 198 } 199 ) 200 AccessToken.objects.create( 201 provider=self.other_provider, 202 token=token, 203 user=user, 204 auth_time=now(), 205 ) 206 207 response = self.client.post( 208 reverse("authentik_providers_oauth2:token"), 209 { 210 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 211 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 212 "client_id": self.provider.client_id, 213 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 214 "client_assertion": token, 215 }, 216 ) 217 self.assertEqual(response.status_code, 200) 218 body = loads(response.content.decode()) 219 self.assertEqual(body["token_type"], TOKEN_TYPE) 220 _, alg = self.provider.jwt_key 221 jwt = decode( 222 body["access_token"], 223 key=self.provider.signing_key.public_key, 224 algorithms=[alg], 225 audience=self.provider.client_id, 226 ) 227 self.assertEqual(jwt["given_name"], user.name) 228 self.assertEqual(jwt["preferred_username"], user.username)
34class TestTokenClientCredentialsJWTProvider(OAuthTestCase): 35 """Test token (client_credentials, with JWT) view""" 36 37 @apply_blueprint("system/providers-oauth2.yaml") 38 def setUp(self) -> None: 39 super().setUp() 40 self.factory = RequestFactory() 41 self.other_cert = create_test_cert() 42 self.cert = create_test_cert() 43 44 self.other_provider = OAuth2Provider.objects.create( 45 name=generate_id(), 46 authorization_flow=create_test_flow(), 47 signing_key=self.other_cert, 48 ) 49 self.other_provider.property_mappings.set(ScopeMapping.objects.all()) 50 self.app = Application.objects.create( 51 name=generate_id(), slug=generate_id(), provider=self.other_provider 52 ) 53 54 self.provider: OAuth2Provider = OAuth2Provider.objects.create( 55 name="test", 56 authorization_flow=create_test_flow(), 57 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 58 signing_key=self.cert, 59 ) 60 self.provider.jwt_federation_providers.add(self.other_provider) 61 self.provider.property_mappings.set(ScopeMapping.objects.all()) 62 self.app = Application.objects.create(name="test", slug="test", provider=self.provider) 63 64 def test_invalid_type(self): 65 """test invalid type""" 66 response = self.client.post( 67 reverse("authentik_providers_oauth2:token"), 68 { 69 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 70 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 71 "client_id": self.provider.client_id, 72 "client_assertion_type": "foo", 73 "client_assertion": "foo.bar", 74 }, 75 ) 76 self.assertEqual(response.status_code, 400) 77 body = loads(response.content.decode()) 78 self.assertEqual(body["error"], "invalid_grant") 79 80 def test_invalid_jwt(self): 81 """test invalid JWT""" 82 response = self.client.post( 83 reverse("authentik_providers_oauth2:token"), 84 { 85 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 86 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 87 "client_id": self.provider.client_id, 88 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 89 "client_assertion": "foo.bar", 90 }, 91 ) 92 self.assertEqual(response.status_code, 400) 93 body = loads(response.content.decode()) 94 self.assertEqual(body["error"], "invalid_grant") 95 96 def test_invalid_signature(self): 97 """test invalid JWT""" 98 token = self.provider.encode( 99 { 100 "sub": "foo", 101 "exp": datetime.now() + timedelta(hours=2), 102 } 103 ) 104 response = self.client.post( 105 reverse("authentik_providers_oauth2:token"), 106 { 107 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 108 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 109 "client_id": self.provider.client_id, 110 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 111 "client_assertion": token + "foo", 112 }, 113 ) 114 self.assertEqual(response.status_code, 400) 115 body = loads(response.content.decode()) 116 self.assertEqual(body["error"], "invalid_grant") 117 118 def test_invalid_expired(self): 119 """test invalid JWT""" 120 token = self.provider.encode( 121 { 122 "sub": "foo", 123 "exp": datetime.now() - timedelta(hours=2), 124 } 125 ) 126 response = self.client.post( 127 reverse("authentik_providers_oauth2:token"), 128 { 129 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 130 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 131 "client_id": self.provider.client_id, 132 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 133 "client_assertion": token, 134 }, 135 ) 136 self.assertEqual(response.status_code, 400) 137 body = loads(response.content.decode()) 138 self.assertEqual(body["error"], "invalid_grant") 139 140 def test_invalid_no_app(self): 141 """test invalid JWT""" 142 self.app.provider = None 143 self.app.save() 144 token = self.provider.encode( 145 { 146 "sub": "foo", 147 "exp": datetime.now() + timedelta(hours=2), 148 } 149 ) 150 response = self.client.post( 151 reverse("authentik_providers_oauth2:token"), 152 { 153 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 154 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 155 "client_id": self.provider.client_id, 156 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 157 "client_assertion": token, 158 }, 159 ) 160 self.assertEqual(response.status_code, 400) 161 body = loads(response.content.decode()) 162 self.assertEqual(body["error"], "invalid_grant") 163 164 def test_invalid_access_denied(self): 165 """test invalid JWT""" 166 group = Group.objects.create(name="foo") 167 PolicyBinding.objects.create( 168 group=group, 169 target=self.app, 170 order=0, 171 ) 172 token = self.provider.encode( 173 { 174 "sub": "foo", 175 "exp": datetime.now() + timedelta(hours=2), 176 } 177 ) 178 response = self.client.post( 179 reverse("authentik_providers_oauth2:token"), 180 { 181 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 182 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 183 "client_id": self.provider.client_id, 184 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 185 "client_assertion": token, 186 }, 187 ) 188 self.assertEqual(response.status_code, 400) 189 body = loads(response.content.decode()) 190 self.assertEqual(body["error"], "invalid_grant") 191 192 def test_successful(self): 193 """test successful""" 194 user = create_test_user() 195 token = self.other_provider.encode( 196 { 197 "sub": "foo", 198 "exp": datetime.now() + timedelta(hours=2), 199 } 200 ) 201 AccessToken.objects.create( 202 provider=self.other_provider, 203 token=token, 204 user=user, 205 auth_time=now(), 206 ) 207 208 response = self.client.post( 209 reverse("authentik_providers_oauth2:token"), 210 { 211 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 212 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 213 "client_id": self.provider.client_id, 214 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 215 "client_assertion": token, 216 }, 217 ) 218 self.assertEqual(response.status_code, 200) 219 body = loads(response.content.decode()) 220 self.assertEqual(body["token_type"], TOKEN_TYPE) 221 _, alg = self.provider.jwt_key 222 jwt = decode( 223 body["access_token"], 224 key=self.provider.signing_key.public_key, 225 algorithms=[alg], 226 audience=self.provider.client_id, 227 ) 228 self.assertEqual(jwt["given_name"], user.name) 229 self.assertEqual(jwt["preferred_username"], user.username)
Test token (client_credentials, with JWT) view
@apply_blueprint('system/providers-oauth2.yaml')
def
setUp(self) -> None:
37 @apply_blueprint("system/providers-oauth2.yaml") 38 def setUp(self) -> None: 39 super().setUp() 40 self.factory = RequestFactory() 41 self.other_cert = create_test_cert() 42 self.cert = create_test_cert() 43 44 self.other_provider = OAuth2Provider.objects.create( 45 name=generate_id(), 46 authorization_flow=create_test_flow(), 47 signing_key=self.other_cert, 48 ) 49 self.other_provider.property_mappings.set(ScopeMapping.objects.all()) 50 self.app = Application.objects.create( 51 name=generate_id(), slug=generate_id(), provider=self.other_provider 52 ) 53 54 self.provider: OAuth2Provider = OAuth2Provider.objects.create( 55 name="test", 56 authorization_flow=create_test_flow(), 57 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")], 58 signing_key=self.cert, 59 ) 60 self.provider.jwt_federation_providers.add(self.other_provider) 61 self.provider.property_mappings.set(ScopeMapping.objects.all()) 62 self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
Hook method for setting up the test fixture before exercising it.
def
test_invalid_type(self):
64 def test_invalid_type(self): 65 """test invalid type""" 66 response = self.client.post( 67 reverse("authentik_providers_oauth2:token"), 68 { 69 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 70 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 71 "client_id": self.provider.client_id, 72 "client_assertion_type": "foo", 73 "client_assertion": "foo.bar", 74 }, 75 ) 76 self.assertEqual(response.status_code, 400) 77 body = loads(response.content.decode()) 78 self.assertEqual(body["error"], "invalid_grant")
test invalid type
def
test_invalid_jwt(self):
80 def test_invalid_jwt(self): 81 """test invalid JWT""" 82 response = self.client.post( 83 reverse("authentik_providers_oauth2:token"), 84 { 85 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 86 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 87 "client_id": self.provider.client_id, 88 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 89 "client_assertion": "foo.bar", 90 }, 91 ) 92 self.assertEqual(response.status_code, 400) 93 body = loads(response.content.decode()) 94 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_signature(self):
96 def test_invalid_signature(self): 97 """test invalid JWT""" 98 token = self.provider.encode( 99 { 100 "sub": "foo", 101 "exp": datetime.now() + timedelta(hours=2), 102 } 103 ) 104 response = self.client.post( 105 reverse("authentik_providers_oauth2:token"), 106 { 107 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 108 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 109 "client_id": self.provider.client_id, 110 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 111 "client_assertion": token + "foo", 112 }, 113 ) 114 self.assertEqual(response.status_code, 400) 115 body = loads(response.content.decode()) 116 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_expired(self):
118 def test_invalid_expired(self): 119 """test invalid JWT""" 120 token = self.provider.encode( 121 { 122 "sub": "foo", 123 "exp": datetime.now() - timedelta(hours=2), 124 } 125 ) 126 response = self.client.post( 127 reverse("authentik_providers_oauth2:token"), 128 { 129 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 130 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 131 "client_id": self.provider.client_id, 132 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 133 "client_assertion": token, 134 }, 135 ) 136 self.assertEqual(response.status_code, 400) 137 body = loads(response.content.decode()) 138 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_no_app(self):
140 def test_invalid_no_app(self): 141 """test invalid JWT""" 142 self.app.provider = None 143 self.app.save() 144 token = self.provider.encode( 145 { 146 "sub": "foo", 147 "exp": datetime.now() + timedelta(hours=2), 148 } 149 ) 150 response = self.client.post( 151 reverse("authentik_providers_oauth2:token"), 152 { 153 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 154 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 155 "client_id": self.provider.client_id, 156 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 157 "client_assertion": token, 158 }, 159 ) 160 self.assertEqual(response.status_code, 400) 161 body = loads(response.content.decode()) 162 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_invalid_access_denied(self):
164 def test_invalid_access_denied(self): 165 """test invalid JWT""" 166 group = Group.objects.create(name="foo") 167 PolicyBinding.objects.create( 168 group=group, 169 target=self.app, 170 order=0, 171 ) 172 token = self.provider.encode( 173 { 174 "sub": "foo", 175 "exp": datetime.now() + timedelta(hours=2), 176 } 177 ) 178 response = self.client.post( 179 reverse("authentik_providers_oauth2:token"), 180 { 181 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 182 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 183 "client_id": self.provider.client_id, 184 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 185 "client_assertion": token, 186 }, 187 ) 188 self.assertEqual(response.status_code, 400) 189 body = loads(response.content.decode()) 190 self.assertEqual(body["error"], "invalid_grant")
test invalid JWT
def
test_successful(self):
192 def test_successful(self): 193 """test successful""" 194 user = create_test_user() 195 token = self.other_provider.encode( 196 { 197 "sub": "foo", 198 "exp": datetime.now() + timedelta(hours=2), 199 } 200 ) 201 AccessToken.objects.create( 202 provider=self.other_provider, 203 token=token, 204 user=user, 205 auth_time=now(), 206 ) 207 208 response = self.client.post( 209 reverse("authentik_providers_oauth2:token"), 210 { 211 "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS, 212 "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}", 213 "client_id": self.provider.client_id, 214 "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 215 "client_assertion": token, 216 }, 217 ) 218 self.assertEqual(response.status_code, 200) 219 body = loads(response.content.decode()) 220 self.assertEqual(body["token_type"], TOKEN_TYPE) 221 _, alg = self.provider.jwt_key 222 jwt = decode( 223 body["access_token"], 224 key=self.provider.signing_key.public_key, 225 algorithms=[alg], 226 audience=self.provider.client_id, 227 ) 228 self.assertEqual(jwt["given_name"], user.name) 229 self.assertEqual(jwt["preferred_username"], user.username)
test successful