authentik.providers.oauth2.tests.test_token_cc_jwt_source

Test token view

  1"""Test token view"""
  2
  3from datetime import datetime, timedelta
  4from json import loads
  5
  6from django.test import RequestFactory
  7from django.urls import reverse
  8from jwt import decode
  9
 10from authentik.blueprints.tests import apply_blueprint
 11from authentik.common.oauth.constants import (
 12    GRANT_TYPE_CLIENT_CREDENTIALS,
 13    SCOPE_OPENID,
 14    SCOPE_OPENID_EMAIL,
 15    SCOPE_OPENID_PROFILE,
 16    TOKEN_TYPE,
 17)
 18from authentik.core.models import USERNAME_MAX_LENGTH, Application, Group, User
 19from authentik.core.tests.utils import create_test_cert, create_test_flow
 20from authentik.lib.generators import generate_id
 21from authentik.policies.models import PolicyBinding
 22from authentik.providers.oauth2.models import (
 23    OAuth2Provider,
 24    RedirectURI,
 25    RedirectURIMatchingMode,
 26    ScopeMapping,
 27)
 28from authentik.providers.oauth2.tests.utils import OAuthTestCase
 29from authentik.providers.oauth2.views.jwks import JWKSView
 30from authentik.sources.oauth.models import OAuthSource, OAuthSourcePropertyMapping
 31
 32
 33class TestTokenClientCredentialsJWTSource(OAuthTestCase):
 34    """Test token (client_credentials, with JWT) view"""
 35
 36    @apply_blueprint("system/providers-oauth2.yaml")
 37    def setUp(self) -> None:
 38        super().setUp()
 39        self.factory = RequestFactory()
 40        self.other_cert = create_test_cert()
 41        # Provider used as a helper to sign JWTs with the same key as the OAuth source has
 42        self.helper_provider = OAuth2Provider.objects.create(
 43            name=generate_id(),
 44            authorization_flow=create_test_flow(),
 45            signing_key=self.other_cert,
 46        )
 47        self.cert = create_test_cert()
 48
 49        jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig")
 50        self.source: OAuthSource = OAuthSource.objects.create(
 51            name=generate_id(),
 52            slug=generate_id(),
 53            provider_type="openidconnect",
 54            consumer_key=generate_id(),
 55            consumer_secret=generate_id(),
 56            authorization_url="http://foo",
 57            access_token_url=f"http://{generate_id()}",
 58            profile_url="http://foo",
 59            oidc_well_known_url="",
 60            oidc_jwks_url="",
 61            oidc_jwks={
 62                "keys": [jwk],
 63            },
 64        )
 65
 66        self.provider: OAuth2Provider = OAuth2Provider.objects.create(
 67            name="test",
 68            authorization_flow=create_test_flow(),
 69            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
 70            signing_key=self.cert,
 71        )
 72        self.provider.jwt_federation_sources.add(self.source)
 73        self.provider.property_mappings.set(ScopeMapping.objects.all())
 74        self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
 75
 76    def test_invalid_type(self):
 77        """test invalid type"""
 78        response = self.client.post(
 79            reverse("authentik_providers_oauth2:token"),
 80            {
 81                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
 82                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
 83                "client_id": self.provider.client_id,
 84                "client_assertion_type": "foo",
 85                "client_assertion": "foo.bar",
 86            },
 87        )
 88        self.assertEqual(response.status_code, 400)
 89        body = loads(response.content.decode())
 90        self.assertEqual(body["error"], "invalid_grant")
 91
 92    def test_invalid_jwt(self):
 93        """test invalid JWT"""
 94        response = self.client.post(
 95            reverse("authentik_providers_oauth2:token"),
 96            {
 97                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
 98                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
 99                "client_id": self.provider.client_id,
100                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
101                "client_assertion": "foo.bar",
102            },
103        )
104        self.assertEqual(response.status_code, 400)
105        body = loads(response.content.decode())
106        self.assertEqual(body["error"], "invalid_grant")
107
108    def test_invalid_signature(self):
109        """test invalid JWT"""
110        token = self.helper_provider.encode(
111            {
112                "sub": "foo",
113                "exp": datetime.now() + timedelta(hours=2),
114            }
115        )
116        response = self.client.post(
117            reverse("authentik_providers_oauth2:token"),
118            {
119                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
120                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
121                "client_id": self.provider.client_id,
122                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
123                "client_assertion": token + "foo",
124            },
125        )
126        self.assertEqual(response.status_code, 400)
127        body = loads(response.content.decode())
128        self.assertEqual(body["error"], "invalid_grant")
129
130    def test_invalid_expired(self):
131        """test invalid JWT"""
132        token = self.helper_provider.encode(
133            {
134                "sub": "foo",
135                "exp": datetime.now() - timedelta(hours=2),
136            }
137        )
138        response = self.client.post(
139            reverse("authentik_providers_oauth2:token"),
140            {
141                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
142                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
143                "client_id": self.provider.client_id,
144                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
145                "client_assertion": token,
146            },
147        )
148        self.assertEqual(response.status_code, 400)
149        body = loads(response.content.decode())
150        self.assertEqual(body["error"], "invalid_grant")
151
152    def test_invalid_no_app(self):
153        """test invalid JWT"""
154        self.app.provider = None
155        self.app.save()
156        token = self.helper_provider.encode(
157            {
158                "sub": "foo",
159                "exp": datetime.now() + timedelta(hours=2),
160            }
161        )
162        response = self.client.post(
163            reverse("authentik_providers_oauth2:token"),
164            {
165                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
166                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
167                "client_id": self.provider.client_id,
168                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
169                "client_assertion": token,
170            },
171        )
172        self.assertEqual(response.status_code, 400)
173        body = loads(response.content.decode())
174        self.assertEqual(body["error"], "invalid_grant")
175
176    def test_invalid_access_denied(self):
177        """test invalid JWT"""
178        group = Group.objects.create(name="foo")
179        PolicyBinding.objects.create(
180            group=group,
181            target=self.app,
182            order=0,
183        )
184        token = self.helper_provider.encode(
185            {
186                "sub": "foo",
187                "exp": datetime.now() + timedelta(hours=2),
188            }
189        )
190        response = self.client.post(
191            reverse("authentik_providers_oauth2:token"),
192            {
193                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
194                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
195                "client_id": self.provider.client_id,
196                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
197                "client_assertion": token,
198            },
199        )
200        self.assertEqual(response.status_code, 400)
201        body = loads(response.content.decode())
202        self.assertEqual(body["error"], "invalid_grant")
203
204    def test_successful(self):
205        """test successful"""
206        token = self.helper_provider.encode(
207            {
208                "sub": "foo",
209                "exp": datetime.now() + timedelta(hours=2),
210            }
211        )
212        response = self.client.post(
213            reverse("authentik_providers_oauth2:token"),
214            {
215                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
216                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
217                "client_id": self.provider.client_id,
218                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
219                "client_assertion": token,
220            },
221        )
222        self.assertEqual(response.status_code, 200)
223
224        user = User.objects.filter(username=f"{self.provider.name}-foo").first()
225        self.assertIsNotNone(user)
226
227        body = loads(response.content.decode())
228        self.assertEqual(body["token_type"], TOKEN_TYPE)
229        _, alg = self.provider.jwt_key
230        jwt = decode(
231            body["access_token"],
232            key=self.provider.signing_key.public_key,
233            algorithms=[alg],
234            audience=self.provider.client_id,
235        )
236        self.assertEqual(
237            jwt["given_name"], "Autogenerated user from application test (client credentials JWT)"
238        )
239        self.assertEqual(jwt["preferred_username"], "test-foo")
240
241    def test_successful_mapping(self):
242        """test successful"""
243        test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH]
244        mapping = OAuthSourcePropertyMapping.objects.create(
245            name="test-mapping",
246            expression="""return {
247                "email": oauth_userinfo.get("email"),
248                "name": oauth_userinfo.get("name"),
249                "username": oauth_userinfo.get("username"),
250            }""",
251        )
252        self.source.user_property_mappings.add(mapping)
253
254        token = self.helper_provider.encode(
255            {
256                "sub": "foo",
257                "email": "test-user@example.com",
258                "name": "Mapped Test User",
259                "username": "mapped-foo" + ("a" * 150),
260                "exp": datetime.now() + timedelta(hours=2),
261            }
262        )
263        response = self.client.post(
264            reverse("authentik_providers_oauth2:token"),
265            {
266                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
267                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
268                "client_id": self.provider.client_id,
269                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
270                "client_assertion": token,
271            },
272        )
273        self.assertEqual(response.status_code, 200)
274
275        user = User.objects.filter(username=test_username).first()
276        self.assertIsNotNone(user)
277
278        body = loads(response.content.decode())
279        self.assertEqual(body["token_type"], TOKEN_TYPE)
280        key_obj, alg = self.provider.jwt_key
281        jwt = decode(
282            body["access_token"],
283            key=key_obj.public_key(),
284            algorithms=[alg],
285            audience=self.provider.client_id,
286        )
287
288        self.assertEqual(jwt["email"], "test-user@example.com")
289        self.assertEqual(jwt["given_name"], "Mapped Test User")
290        self.assertEqual(jwt["preferred_username"], test_username)
class TestTokenClientCredentialsJWTSource(authentik.providers.oauth2.tests.utils.OAuthTestCase):
 34class TestTokenClientCredentialsJWTSource(OAuthTestCase):
 35    """Test token (client_credentials, with JWT) view"""
 36
 37    @apply_blueprint("system/providers-oauth2.yaml")
 38    def setUp(self) -> None:
 39        super().setUp()
 40        self.factory = RequestFactory()
 41        self.other_cert = create_test_cert()
 42        # Provider used as a helper to sign JWTs with the same key as the OAuth source has
 43        self.helper_provider = OAuth2Provider.objects.create(
 44            name=generate_id(),
 45            authorization_flow=create_test_flow(),
 46            signing_key=self.other_cert,
 47        )
 48        self.cert = create_test_cert()
 49
 50        jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig")
 51        self.source: OAuthSource = OAuthSource.objects.create(
 52            name=generate_id(),
 53            slug=generate_id(),
 54            provider_type="openidconnect",
 55            consumer_key=generate_id(),
 56            consumer_secret=generate_id(),
 57            authorization_url="http://foo",
 58            access_token_url=f"http://{generate_id()}",
 59            profile_url="http://foo",
 60            oidc_well_known_url="",
 61            oidc_jwks_url="",
 62            oidc_jwks={
 63                "keys": [jwk],
 64            },
 65        )
 66
 67        self.provider: OAuth2Provider = OAuth2Provider.objects.create(
 68            name="test",
 69            authorization_flow=create_test_flow(),
 70            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
 71            signing_key=self.cert,
 72        )
 73        self.provider.jwt_federation_sources.add(self.source)
 74        self.provider.property_mappings.set(ScopeMapping.objects.all())
 75        self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
 76
 77    def test_invalid_type(self):
 78        """test invalid type"""
 79        response = self.client.post(
 80            reverse("authentik_providers_oauth2:token"),
 81            {
 82                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
 83                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
 84                "client_id": self.provider.client_id,
 85                "client_assertion_type": "foo",
 86                "client_assertion": "foo.bar",
 87            },
 88        )
 89        self.assertEqual(response.status_code, 400)
 90        body = loads(response.content.decode())
 91        self.assertEqual(body["error"], "invalid_grant")
 92
 93    def test_invalid_jwt(self):
 94        """test invalid JWT"""
 95        response = self.client.post(
 96            reverse("authentik_providers_oauth2:token"),
 97            {
 98                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
 99                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
100                "client_id": self.provider.client_id,
101                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
102                "client_assertion": "foo.bar",
103            },
104        )
105        self.assertEqual(response.status_code, 400)
106        body = loads(response.content.decode())
107        self.assertEqual(body["error"], "invalid_grant")
108
109    def test_invalid_signature(self):
110        """test invalid JWT"""
111        token = self.helper_provider.encode(
112            {
113                "sub": "foo",
114                "exp": datetime.now() + timedelta(hours=2),
115            }
116        )
117        response = self.client.post(
118            reverse("authentik_providers_oauth2:token"),
119            {
120                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
121                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
122                "client_id": self.provider.client_id,
123                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
124                "client_assertion": token + "foo",
125            },
126        )
127        self.assertEqual(response.status_code, 400)
128        body = loads(response.content.decode())
129        self.assertEqual(body["error"], "invalid_grant")
130
131    def test_invalid_expired(self):
132        """test invalid JWT"""
133        token = self.helper_provider.encode(
134            {
135                "sub": "foo",
136                "exp": datetime.now() - timedelta(hours=2),
137            }
138        )
139        response = self.client.post(
140            reverse("authentik_providers_oauth2:token"),
141            {
142                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
143                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
144                "client_id": self.provider.client_id,
145                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
146                "client_assertion": token,
147            },
148        )
149        self.assertEqual(response.status_code, 400)
150        body = loads(response.content.decode())
151        self.assertEqual(body["error"], "invalid_grant")
152
153    def test_invalid_no_app(self):
154        """test invalid JWT"""
155        self.app.provider = None
156        self.app.save()
157        token = self.helper_provider.encode(
158            {
159                "sub": "foo",
160                "exp": datetime.now() + timedelta(hours=2),
161            }
162        )
163        response = self.client.post(
164            reverse("authentik_providers_oauth2:token"),
165            {
166                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
167                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
168                "client_id": self.provider.client_id,
169                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
170                "client_assertion": token,
171            },
172        )
173        self.assertEqual(response.status_code, 400)
174        body = loads(response.content.decode())
175        self.assertEqual(body["error"], "invalid_grant")
176
177    def test_invalid_access_denied(self):
178        """test invalid JWT"""
179        group = Group.objects.create(name="foo")
180        PolicyBinding.objects.create(
181            group=group,
182            target=self.app,
183            order=0,
184        )
185        token = self.helper_provider.encode(
186            {
187                "sub": "foo",
188                "exp": datetime.now() + timedelta(hours=2),
189            }
190        )
191        response = self.client.post(
192            reverse("authentik_providers_oauth2:token"),
193            {
194                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
195                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
196                "client_id": self.provider.client_id,
197                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
198                "client_assertion": token,
199            },
200        )
201        self.assertEqual(response.status_code, 400)
202        body = loads(response.content.decode())
203        self.assertEqual(body["error"], "invalid_grant")
204
205    def test_successful(self):
206        """test successful"""
207        token = self.helper_provider.encode(
208            {
209                "sub": "foo",
210                "exp": datetime.now() + timedelta(hours=2),
211            }
212        )
213        response = self.client.post(
214            reverse("authentik_providers_oauth2:token"),
215            {
216                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
217                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
218                "client_id": self.provider.client_id,
219                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
220                "client_assertion": token,
221            },
222        )
223        self.assertEqual(response.status_code, 200)
224
225        user = User.objects.filter(username=f"{self.provider.name}-foo").first()
226        self.assertIsNotNone(user)
227
228        body = loads(response.content.decode())
229        self.assertEqual(body["token_type"], TOKEN_TYPE)
230        _, alg = self.provider.jwt_key
231        jwt = decode(
232            body["access_token"],
233            key=self.provider.signing_key.public_key,
234            algorithms=[alg],
235            audience=self.provider.client_id,
236        )
237        self.assertEqual(
238            jwt["given_name"], "Autogenerated user from application test (client credentials JWT)"
239        )
240        self.assertEqual(jwt["preferred_username"], "test-foo")
241
242    def test_successful_mapping(self):
243        """test successful"""
244        test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH]
245        mapping = OAuthSourcePropertyMapping.objects.create(
246            name="test-mapping",
247            expression="""return {
248                "email": oauth_userinfo.get("email"),
249                "name": oauth_userinfo.get("name"),
250                "username": oauth_userinfo.get("username"),
251            }""",
252        )
253        self.source.user_property_mappings.add(mapping)
254
255        token = self.helper_provider.encode(
256            {
257                "sub": "foo",
258                "email": "test-user@example.com",
259                "name": "Mapped Test User",
260                "username": "mapped-foo" + ("a" * 150),
261                "exp": datetime.now() + timedelta(hours=2),
262            }
263        )
264        response = self.client.post(
265            reverse("authentik_providers_oauth2:token"),
266            {
267                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
268                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
269                "client_id": self.provider.client_id,
270                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
271                "client_assertion": token,
272            },
273        )
274        self.assertEqual(response.status_code, 200)
275
276        user = User.objects.filter(username=test_username).first()
277        self.assertIsNotNone(user)
278
279        body = loads(response.content.decode())
280        self.assertEqual(body["token_type"], TOKEN_TYPE)
281        key_obj, alg = self.provider.jwt_key
282        jwt = decode(
283            body["access_token"],
284            key=key_obj.public_key(),
285            algorithms=[alg],
286            audience=self.provider.client_id,
287        )
288
289        self.assertEqual(jwt["email"], "test-user@example.com")
290        self.assertEqual(jwt["given_name"], "Mapped Test User")
291        self.assertEqual(jwt["preferred_username"], test_username)

Test token (client_credentials, with JWT) view

@apply_blueprint('system/providers-oauth2.yaml')
def setUp(self) -> None:
37    @apply_blueprint("system/providers-oauth2.yaml")
38    def setUp(self) -> None:
39        super().setUp()
40        self.factory = RequestFactory()
41        self.other_cert = create_test_cert()
42        # Provider used as a helper to sign JWTs with the same key as the OAuth source has
43        self.helper_provider = OAuth2Provider.objects.create(
44            name=generate_id(),
45            authorization_flow=create_test_flow(),
46            signing_key=self.other_cert,
47        )
48        self.cert = create_test_cert()
49
50        jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig")
51        self.source: OAuthSource = OAuthSource.objects.create(
52            name=generate_id(),
53            slug=generate_id(),
54            provider_type="openidconnect",
55            consumer_key=generate_id(),
56            consumer_secret=generate_id(),
57            authorization_url="http://foo",
58            access_token_url=f"http://{generate_id()}",
59            profile_url="http://foo",
60            oidc_well_known_url="",
61            oidc_jwks_url="",
62            oidc_jwks={
63                "keys": [jwk],
64            },
65        )
66
67        self.provider: OAuth2Provider = OAuth2Provider.objects.create(
68            name="test",
69            authorization_flow=create_test_flow(),
70            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
71            signing_key=self.cert,
72        )
73        self.provider.jwt_federation_sources.add(self.source)
74        self.provider.property_mappings.set(ScopeMapping.objects.all())
75        self.app = Application.objects.create(name="test", slug="test", provider=self.provider)

Hook method for setting up the test fixture before exercising it.

def test_invalid_type(self):
77    def test_invalid_type(self):
78        """test invalid type"""
79        response = self.client.post(
80            reverse("authentik_providers_oauth2:token"),
81            {
82                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
83                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
84                "client_id": self.provider.client_id,
85                "client_assertion_type": "foo",
86                "client_assertion": "foo.bar",
87            },
88        )
89        self.assertEqual(response.status_code, 400)
90        body = loads(response.content.decode())
91        self.assertEqual(body["error"], "invalid_grant")

test invalid type

def test_invalid_jwt(self):
 93    def test_invalid_jwt(self):
 94        """test invalid JWT"""
 95        response = self.client.post(
 96            reverse("authentik_providers_oauth2:token"),
 97            {
 98                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
 99                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
100                "client_id": self.provider.client_id,
101                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
102                "client_assertion": "foo.bar",
103            },
104        )
105        self.assertEqual(response.status_code, 400)
106        body = loads(response.content.decode())
107        self.assertEqual(body["error"], "invalid_grant")

test invalid JWT

def test_invalid_signature(self):
109    def test_invalid_signature(self):
110        """test invalid JWT"""
111        token = self.helper_provider.encode(
112            {
113                "sub": "foo",
114                "exp": datetime.now() + timedelta(hours=2),
115            }
116        )
117        response = self.client.post(
118            reverse("authentik_providers_oauth2:token"),
119            {
120                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
121                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
122                "client_id": self.provider.client_id,
123                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
124                "client_assertion": token + "foo",
125            },
126        )
127        self.assertEqual(response.status_code, 400)
128        body = loads(response.content.decode())
129        self.assertEqual(body["error"], "invalid_grant")

test invalid JWT

def test_invalid_expired(self):
131    def test_invalid_expired(self):
132        """test invalid JWT"""
133        token = self.helper_provider.encode(
134            {
135                "sub": "foo",
136                "exp": datetime.now() - timedelta(hours=2),
137            }
138        )
139        response = self.client.post(
140            reverse("authentik_providers_oauth2:token"),
141            {
142                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
143                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
144                "client_id": self.provider.client_id,
145                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
146                "client_assertion": token,
147            },
148        )
149        self.assertEqual(response.status_code, 400)
150        body = loads(response.content.decode())
151        self.assertEqual(body["error"], "invalid_grant")

test invalid JWT

def test_invalid_no_app(self):
153    def test_invalid_no_app(self):
154        """test invalid JWT"""
155        self.app.provider = None
156        self.app.save()
157        token = self.helper_provider.encode(
158            {
159                "sub": "foo",
160                "exp": datetime.now() + timedelta(hours=2),
161            }
162        )
163        response = self.client.post(
164            reverse("authentik_providers_oauth2:token"),
165            {
166                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
167                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
168                "client_id": self.provider.client_id,
169                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
170                "client_assertion": token,
171            },
172        )
173        self.assertEqual(response.status_code, 400)
174        body = loads(response.content.decode())
175        self.assertEqual(body["error"], "invalid_grant")

test invalid JWT

def test_invalid_access_denied(self):
177    def test_invalid_access_denied(self):
178        """test invalid JWT"""
179        group = Group.objects.create(name="foo")
180        PolicyBinding.objects.create(
181            group=group,
182            target=self.app,
183            order=0,
184        )
185        token = self.helper_provider.encode(
186            {
187                "sub": "foo",
188                "exp": datetime.now() + timedelta(hours=2),
189            }
190        )
191        response = self.client.post(
192            reverse("authentik_providers_oauth2:token"),
193            {
194                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
195                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
196                "client_id": self.provider.client_id,
197                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
198                "client_assertion": token,
199            },
200        )
201        self.assertEqual(response.status_code, 400)
202        body = loads(response.content.decode())
203        self.assertEqual(body["error"], "invalid_grant")

test invalid JWT

def test_successful(self):
205    def test_successful(self):
206        """test successful"""
207        token = self.helper_provider.encode(
208            {
209                "sub": "foo",
210                "exp": datetime.now() + timedelta(hours=2),
211            }
212        )
213        response = self.client.post(
214            reverse("authentik_providers_oauth2:token"),
215            {
216                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
217                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
218                "client_id": self.provider.client_id,
219                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
220                "client_assertion": token,
221            },
222        )
223        self.assertEqual(response.status_code, 200)
224
225        user = User.objects.filter(username=f"{self.provider.name}-foo").first()
226        self.assertIsNotNone(user)
227
228        body = loads(response.content.decode())
229        self.assertEqual(body["token_type"], TOKEN_TYPE)
230        _, alg = self.provider.jwt_key
231        jwt = decode(
232            body["access_token"],
233            key=self.provider.signing_key.public_key,
234            algorithms=[alg],
235            audience=self.provider.client_id,
236        )
237        self.assertEqual(
238            jwt["given_name"], "Autogenerated user from application test (client credentials JWT)"
239        )
240        self.assertEqual(jwt["preferred_username"], "test-foo")

test successful

def test_successful_mapping(self):
242    def test_successful_mapping(self):
243        """test successful"""
244        test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH]
245        mapping = OAuthSourcePropertyMapping.objects.create(
246            name="test-mapping",
247            expression="""return {
248                "email": oauth_userinfo.get("email"),
249                "name": oauth_userinfo.get("name"),
250                "username": oauth_userinfo.get("username"),
251            }""",
252        )
253        self.source.user_property_mappings.add(mapping)
254
255        token = self.helper_provider.encode(
256            {
257                "sub": "foo",
258                "email": "test-user@example.com",
259                "name": "Mapped Test User",
260                "username": "mapped-foo" + ("a" * 150),
261                "exp": datetime.now() + timedelta(hours=2),
262            }
263        )
264        response = self.client.post(
265            reverse("authentik_providers_oauth2:token"),
266            {
267                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
268                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
269                "client_id": self.provider.client_id,
270                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
271                "client_assertion": token,
272            },
273        )
274        self.assertEqual(response.status_code, 200)
275
276        user = User.objects.filter(username=test_username).first()
277        self.assertIsNotNone(user)
278
279        body = loads(response.content.decode())
280        self.assertEqual(body["token_type"], TOKEN_TYPE)
281        key_obj, alg = self.provider.jwt_key
282        jwt = decode(
283            body["access_token"],
284            key=key_obj.public_key(),
285            algorithms=[alg],
286            audience=self.provider.client_id,
287        )
288
289        self.assertEqual(jwt["email"], "test-user@example.com")
290        self.assertEqual(jwt["given_name"], "Mapped Test User")
291        self.assertEqual(jwt["preferred_username"], test_username)

test successful