authentik.providers.oauth2.tests.test_token_cc_jwt_source

Test token view

  1"""Test token view"""
  2
  3from datetime import datetime, timedelta
  4from json import loads
  5
  6from django.test import RequestFactory
  7from django.urls import reverse
  8from jwt import decode
  9
 10from authentik.blueprints.tests import apply_blueprint
 11from authentik.common.oauth.constants import (
 12    GRANT_TYPE_CLIENT_CREDENTIALS,
 13    SCOPE_OPENID,
 14    SCOPE_OPENID_EMAIL,
 15    SCOPE_OPENID_PROFILE,
 16    TOKEN_TYPE,
 17)
 18from authentik.core.models import USERNAME_MAX_LENGTH, Application, Group, User
 19from authentik.core.tests.utils import create_test_cert, create_test_flow
 20from authentik.lib.generators import generate_id
 21from authentik.policies.models import PolicyBinding
 22from authentik.providers.oauth2.models import (
 23    GrantType,
 24    OAuth2Provider,
 25    RedirectURI,
 26    RedirectURIMatchingMode,
 27    ScopeMapping,
 28)
 29from authentik.providers.oauth2.tests.utils import OAuthTestCase
 30from authentik.providers.oauth2.views.jwks import JWKSView
 31from authentik.sources.oauth.models import OAuthSource, OAuthSourcePropertyMapping
 32
 33
 34class TestTokenClientCredentialsJWTSource(OAuthTestCase):
 35    """Test token (client_credentials, with JWT) view"""
 36
 37    @apply_blueprint("system/providers-oauth2.yaml")
 38    def setUp(self) -> None:
 39        super().setUp()
 40        self.factory = RequestFactory()
 41        self.other_cert = create_test_cert()
 42        # Provider used as a helper to sign JWTs with the same key as the OAuth source has
 43        self.helper_provider = OAuth2Provider.objects.create(
 44            name=generate_id(),
 45            authorization_flow=create_test_flow(),
 46            signing_key=self.other_cert,
 47        )
 48        self.cert = create_test_cert()
 49
 50        jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig")
 51        self.source: OAuthSource = OAuthSource.objects.create(
 52            name=generate_id(),
 53            slug=generate_id(),
 54            provider_type="openidconnect",
 55            consumer_key=generate_id(),
 56            consumer_secret=generate_id(),
 57            authorization_url="http://foo",
 58            access_token_url=f"http://{generate_id()}",
 59            profile_url="http://foo",
 60            oidc_well_known_url="",
 61            oidc_jwks_url="",
 62            oidc_jwks={
 63                "keys": [jwk],
 64            },
 65        )
 66
 67        self.provider: OAuth2Provider = OAuth2Provider.objects.create(
 68            name="test",
 69            authorization_flow=create_test_flow(),
 70            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
 71            signing_key=self.cert,
 72            grant_types=[GrantType.CLIENT_CREDENTIALS],
 73        )
 74        self.provider.jwt_federation_sources.add(self.source)
 75        self.provider.property_mappings.set(ScopeMapping.objects.all())
 76        self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
 77
 78    def test_invalid_type(self):
 79        """test invalid type"""
 80        response = self.client.post(
 81            reverse("authentik_providers_oauth2:token"),
 82            {
 83                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
 84                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
 85                "client_id": self.provider.client_id,
 86                "client_assertion_type": "foo",
 87                "client_assertion": "foo.bar",
 88            },
 89        )
 90        self.assertEqual(response.status_code, 400)
 91        body = loads(response.content.decode())
 92        self.assertEqual(body["error"], "invalid_grant")
 93
 94    def test_invalid_jwt(self):
 95        """test invalid JWT"""
 96        response = self.client.post(
 97            reverse("authentik_providers_oauth2:token"),
 98            {
 99                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
100                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
101                "client_id": self.provider.client_id,
102                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
103                "client_assertion": "foo.bar",
104            },
105        )
106        self.assertEqual(response.status_code, 400)
107        body = loads(response.content.decode())
108        self.assertEqual(body["error"], "invalid_grant")
109
110    def test_invalid_signature(self):
111        """test invalid JWT"""
112        token = self.helper_provider.encode(
113            {
114                "sub": "foo",
115                "exp": datetime.now() + timedelta(hours=2),
116            }
117        )
118        response = self.client.post(
119            reverse("authentik_providers_oauth2:token"),
120            {
121                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
122                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
123                "client_id": self.provider.client_id,
124                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
125                "client_assertion": token + "foo",
126            },
127        )
128        self.assertEqual(response.status_code, 400)
129        body = loads(response.content.decode())
130        self.assertEqual(body["error"], "invalid_grant")
131
132    def test_invalid_expired(self):
133        """test invalid JWT"""
134        token = self.helper_provider.encode(
135            {
136                "sub": "foo",
137                "exp": datetime.now() - timedelta(hours=2),
138            }
139        )
140        response = self.client.post(
141            reverse("authentik_providers_oauth2:token"),
142            {
143                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
144                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
145                "client_id": self.provider.client_id,
146                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
147                "client_assertion": token,
148            },
149        )
150        self.assertEqual(response.status_code, 400)
151        body = loads(response.content.decode())
152        self.assertEqual(body["error"], "invalid_grant")
153
154    def test_invalid_no_app(self):
155        """test invalid JWT"""
156        self.app.provider = None
157        self.app.save()
158        token = self.helper_provider.encode(
159            {
160                "sub": "foo",
161                "exp": datetime.now() + timedelta(hours=2),
162            }
163        )
164        response = self.client.post(
165            reverse("authentik_providers_oauth2:token"),
166            {
167                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
168                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
169                "client_id": self.provider.client_id,
170                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
171                "client_assertion": token,
172            },
173        )
174        self.assertEqual(response.status_code, 400)
175        body = loads(response.content.decode())
176        self.assertEqual(body["error"], "invalid_grant")
177
178    def test_invalid_access_denied(self):
179        """test invalid JWT"""
180        group = Group.objects.create(name="foo")
181        PolicyBinding.objects.create(
182            group=group,
183            target=self.app,
184            order=0,
185        )
186        token = self.helper_provider.encode(
187            {
188                "sub": "foo",
189                "exp": datetime.now() + timedelta(hours=2),
190            }
191        )
192        response = self.client.post(
193            reverse("authentik_providers_oauth2:token"),
194            {
195                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
196                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
197                "client_id": self.provider.client_id,
198                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
199                "client_assertion": token,
200            },
201        )
202        self.assertEqual(response.status_code, 400)
203        body = loads(response.content.decode())
204        self.assertEqual(body["error"], "invalid_grant")
205
206    def test_successful(self):
207        """test successful"""
208        token = self.helper_provider.encode(
209            {
210                "sub": "foo",
211                "exp": datetime.now() + timedelta(hours=2),
212            }
213        )
214        response = self.client.post(
215            reverse("authentik_providers_oauth2:token"),
216            {
217                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
218                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
219                "client_id": self.provider.client_id,
220                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
221                "client_assertion": token,
222            },
223        )
224        self.assertEqual(response.status_code, 200)
225
226        user = User.objects.filter(username=f"{self.provider.name}-foo").first()
227        self.assertIsNotNone(user)
228
229        body = loads(response.content.decode())
230        self.assertEqual(body["token_type"], TOKEN_TYPE)
231        _, alg = self.provider.jwt_key
232        jwt = decode(
233            body["access_token"],
234            key=self.provider.signing_key.public_key,
235            algorithms=[alg],
236            audience=self.provider.client_id,
237        )
238        self.assertEqual(
239            jwt["given_name"], "Autogenerated user from application test (client credentials JWT)"
240        )
241        self.assertEqual(jwt["preferred_username"], "test-foo")
242
243    def test_successful_mapping(self):
244        """test successful"""
245        test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH]
246        mapping = OAuthSourcePropertyMapping.objects.create(
247            name="test-mapping",
248            expression="""return {
249                "email": oauth_userinfo.get("email"),
250                "name": oauth_userinfo.get("name"),
251                "username": oauth_userinfo.get("username"),
252            }""",
253        )
254        self.source.user_property_mappings.add(mapping)
255
256        token = self.helper_provider.encode(
257            {
258                "sub": "foo",
259                "email": "test-user@example.com",
260                "name": "Mapped Test User",
261                "username": "mapped-foo" + ("a" * 150),
262                "exp": datetime.now() + timedelta(hours=2),
263            }
264        )
265        response = self.client.post(
266            reverse("authentik_providers_oauth2:token"),
267            {
268                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
269                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
270                "client_id": self.provider.client_id,
271                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
272                "client_assertion": token,
273            },
274        )
275        self.assertEqual(response.status_code, 200)
276
277        user = User.objects.filter(username=test_username).first()
278        self.assertIsNotNone(user)
279
280        body = loads(response.content.decode())
281        self.assertEqual(body["token_type"], TOKEN_TYPE)
282        key_obj, alg = self.provider.jwt_key
283        jwt = decode(
284            body["access_token"],
285            key=key_obj.public_key(),
286            algorithms=[alg],
287            audience=self.provider.client_id,
288        )
289
290        self.assertEqual(jwt["email"], "test-user@example.com")
291        self.assertEqual(jwt["given_name"], "Mapped Test User")
292        self.assertEqual(jwt["preferred_username"], test_username)
class TestTokenClientCredentialsJWTSource(authentik.providers.oauth2.tests.utils.OAuthTestCase):
 35class TestTokenClientCredentialsJWTSource(OAuthTestCase):
 36    """Test token (client_credentials, with JWT) view"""
 37
 38    @apply_blueprint("system/providers-oauth2.yaml")
 39    def setUp(self) -> None:
 40        super().setUp()
 41        self.factory = RequestFactory()
 42        self.other_cert = create_test_cert()
 43        # Provider used as a helper to sign JWTs with the same key as the OAuth source has
 44        self.helper_provider = OAuth2Provider.objects.create(
 45            name=generate_id(),
 46            authorization_flow=create_test_flow(),
 47            signing_key=self.other_cert,
 48        )
 49        self.cert = create_test_cert()
 50
 51        jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig")
 52        self.source: OAuthSource = OAuthSource.objects.create(
 53            name=generate_id(),
 54            slug=generate_id(),
 55            provider_type="openidconnect",
 56            consumer_key=generate_id(),
 57            consumer_secret=generate_id(),
 58            authorization_url="http://foo",
 59            access_token_url=f"http://{generate_id()}",
 60            profile_url="http://foo",
 61            oidc_well_known_url="",
 62            oidc_jwks_url="",
 63            oidc_jwks={
 64                "keys": [jwk],
 65            },
 66        )
 67
 68        self.provider: OAuth2Provider = OAuth2Provider.objects.create(
 69            name="test",
 70            authorization_flow=create_test_flow(),
 71            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
 72            signing_key=self.cert,
 73            grant_types=[GrantType.CLIENT_CREDENTIALS],
 74        )
 75        self.provider.jwt_federation_sources.add(self.source)
 76        self.provider.property_mappings.set(ScopeMapping.objects.all())
 77        self.app = Application.objects.create(name="test", slug="test", provider=self.provider)
 78
 79    def test_invalid_type(self):
 80        """test invalid type"""
 81        response = self.client.post(
 82            reverse("authentik_providers_oauth2:token"),
 83            {
 84                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
 85                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
 86                "client_id": self.provider.client_id,
 87                "client_assertion_type": "foo",
 88                "client_assertion": "foo.bar",
 89            },
 90        )
 91        self.assertEqual(response.status_code, 400)
 92        body = loads(response.content.decode())
 93        self.assertEqual(body["error"], "invalid_grant")
 94
 95    def test_invalid_jwt(self):
 96        """test invalid JWT"""
 97        response = self.client.post(
 98            reverse("authentik_providers_oauth2:token"),
 99            {
100                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
101                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
102                "client_id": self.provider.client_id,
103                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
104                "client_assertion": "foo.bar",
105            },
106        )
107        self.assertEqual(response.status_code, 400)
108        body = loads(response.content.decode())
109        self.assertEqual(body["error"], "invalid_grant")
110
111    def test_invalid_signature(self):
112        """test invalid JWT"""
113        token = self.helper_provider.encode(
114            {
115                "sub": "foo",
116                "exp": datetime.now() + timedelta(hours=2),
117            }
118        )
119        response = self.client.post(
120            reverse("authentik_providers_oauth2:token"),
121            {
122                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
123                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
124                "client_id": self.provider.client_id,
125                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
126                "client_assertion": token + "foo",
127            },
128        )
129        self.assertEqual(response.status_code, 400)
130        body = loads(response.content.decode())
131        self.assertEqual(body["error"], "invalid_grant")
132
133    def test_invalid_expired(self):
134        """test invalid JWT"""
135        token = self.helper_provider.encode(
136            {
137                "sub": "foo",
138                "exp": datetime.now() - timedelta(hours=2),
139            }
140        )
141        response = self.client.post(
142            reverse("authentik_providers_oauth2:token"),
143            {
144                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
145                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
146                "client_id": self.provider.client_id,
147                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
148                "client_assertion": token,
149            },
150        )
151        self.assertEqual(response.status_code, 400)
152        body = loads(response.content.decode())
153        self.assertEqual(body["error"], "invalid_grant")
154
155    def test_invalid_no_app(self):
156        """test invalid JWT"""
157        self.app.provider = None
158        self.app.save()
159        token = self.helper_provider.encode(
160            {
161                "sub": "foo",
162                "exp": datetime.now() + timedelta(hours=2),
163            }
164        )
165        response = self.client.post(
166            reverse("authentik_providers_oauth2:token"),
167            {
168                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
169                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
170                "client_id": self.provider.client_id,
171                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
172                "client_assertion": token,
173            },
174        )
175        self.assertEqual(response.status_code, 400)
176        body = loads(response.content.decode())
177        self.assertEqual(body["error"], "invalid_grant")
178
179    def test_invalid_access_denied(self):
180        """test invalid JWT"""
181        group = Group.objects.create(name="foo")
182        PolicyBinding.objects.create(
183            group=group,
184            target=self.app,
185            order=0,
186        )
187        token = self.helper_provider.encode(
188            {
189                "sub": "foo",
190                "exp": datetime.now() + timedelta(hours=2),
191            }
192        )
193        response = self.client.post(
194            reverse("authentik_providers_oauth2:token"),
195            {
196                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
197                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
198                "client_id": self.provider.client_id,
199                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
200                "client_assertion": token,
201            },
202        )
203        self.assertEqual(response.status_code, 400)
204        body = loads(response.content.decode())
205        self.assertEqual(body["error"], "invalid_grant")
206
207    def test_successful(self):
208        """test successful"""
209        token = self.helper_provider.encode(
210            {
211                "sub": "foo",
212                "exp": datetime.now() + timedelta(hours=2),
213            }
214        )
215        response = self.client.post(
216            reverse("authentik_providers_oauth2:token"),
217            {
218                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
219                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
220                "client_id": self.provider.client_id,
221                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
222                "client_assertion": token,
223            },
224        )
225        self.assertEqual(response.status_code, 200)
226
227        user = User.objects.filter(username=f"{self.provider.name}-foo").first()
228        self.assertIsNotNone(user)
229
230        body = loads(response.content.decode())
231        self.assertEqual(body["token_type"], TOKEN_TYPE)
232        _, alg = self.provider.jwt_key
233        jwt = decode(
234            body["access_token"],
235            key=self.provider.signing_key.public_key,
236            algorithms=[alg],
237            audience=self.provider.client_id,
238        )
239        self.assertEqual(
240            jwt["given_name"], "Autogenerated user from application test (client credentials JWT)"
241        )
242        self.assertEqual(jwt["preferred_username"], "test-foo")
243
244    def test_successful_mapping(self):
245        """test successful"""
246        test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH]
247        mapping = OAuthSourcePropertyMapping.objects.create(
248            name="test-mapping",
249            expression="""return {
250                "email": oauth_userinfo.get("email"),
251                "name": oauth_userinfo.get("name"),
252                "username": oauth_userinfo.get("username"),
253            }""",
254        )
255        self.source.user_property_mappings.add(mapping)
256
257        token = self.helper_provider.encode(
258            {
259                "sub": "foo",
260                "email": "test-user@example.com",
261                "name": "Mapped Test User",
262                "username": "mapped-foo" + ("a" * 150),
263                "exp": datetime.now() + timedelta(hours=2),
264            }
265        )
266        response = self.client.post(
267            reverse("authentik_providers_oauth2:token"),
268            {
269                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
270                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
271                "client_id": self.provider.client_id,
272                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
273                "client_assertion": token,
274            },
275        )
276        self.assertEqual(response.status_code, 200)
277
278        user = User.objects.filter(username=test_username).first()
279        self.assertIsNotNone(user)
280
281        body = loads(response.content.decode())
282        self.assertEqual(body["token_type"], TOKEN_TYPE)
283        key_obj, alg = self.provider.jwt_key
284        jwt = decode(
285            body["access_token"],
286            key=key_obj.public_key(),
287            algorithms=[alg],
288            audience=self.provider.client_id,
289        )
290
291        self.assertEqual(jwt["email"], "test-user@example.com")
292        self.assertEqual(jwt["given_name"], "Mapped Test User")
293        self.assertEqual(jwt["preferred_username"], test_username)

Test token (client_credentials, with JWT) view

@apply_blueprint('system/providers-oauth2.yaml')
def setUp(self) -> None:
38    @apply_blueprint("system/providers-oauth2.yaml")
39    def setUp(self) -> None:
40        super().setUp()
41        self.factory = RequestFactory()
42        self.other_cert = create_test_cert()
43        # Provider used as a helper to sign JWTs with the same key as the OAuth source has
44        self.helper_provider = OAuth2Provider.objects.create(
45            name=generate_id(),
46            authorization_flow=create_test_flow(),
47            signing_key=self.other_cert,
48        )
49        self.cert = create_test_cert()
50
51        jwk = JWKSView().get_jwk_for_key(self.other_cert, "sig")
52        self.source: OAuthSource = OAuthSource.objects.create(
53            name=generate_id(),
54            slug=generate_id(),
55            provider_type="openidconnect",
56            consumer_key=generate_id(),
57            consumer_secret=generate_id(),
58            authorization_url="http://foo",
59            access_token_url=f"http://{generate_id()}",
60            profile_url="http://foo",
61            oidc_well_known_url="",
62            oidc_jwks_url="",
63            oidc_jwks={
64                "keys": [jwk],
65            },
66        )
67
68        self.provider: OAuth2Provider = OAuth2Provider.objects.create(
69            name="test",
70            authorization_flow=create_test_flow(),
71            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
72            signing_key=self.cert,
73            grant_types=[GrantType.CLIENT_CREDENTIALS],
74        )
75        self.provider.jwt_federation_sources.add(self.source)
76        self.provider.property_mappings.set(ScopeMapping.objects.all())
77        self.app = Application.objects.create(name="test", slug="test", provider=self.provider)

Hook method for setting up the test fixture before exercising it.

def test_invalid_type(self):
79    def test_invalid_type(self):
80        """test invalid type"""
81        response = self.client.post(
82            reverse("authentik_providers_oauth2:token"),
83            {
84                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
85                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
86                "client_id": self.provider.client_id,
87                "client_assertion_type": "foo",
88                "client_assertion": "foo.bar",
89            },
90        )
91        self.assertEqual(response.status_code, 400)
92        body = loads(response.content.decode())
93        self.assertEqual(body["error"], "invalid_grant")

test invalid type

def test_invalid_jwt(self):
 95    def test_invalid_jwt(self):
 96        """test invalid JWT"""
 97        response = self.client.post(
 98            reverse("authentik_providers_oauth2:token"),
 99            {
100                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
101                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
102                "client_id": self.provider.client_id,
103                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
104                "client_assertion": "foo.bar",
105            },
106        )
107        self.assertEqual(response.status_code, 400)
108        body = loads(response.content.decode())
109        self.assertEqual(body["error"], "invalid_grant")

test invalid JWT

def test_invalid_signature(self):
111    def test_invalid_signature(self):
112        """test invalid JWT"""
113        token = self.helper_provider.encode(
114            {
115                "sub": "foo",
116                "exp": datetime.now() + timedelta(hours=2),
117            }
118        )
119        response = self.client.post(
120            reverse("authentik_providers_oauth2:token"),
121            {
122                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
123                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
124                "client_id": self.provider.client_id,
125                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
126                "client_assertion": token + "foo",
127            },
128        )
129        self.assertEqual(response.status_code, 400)
130        body = loads(response.content.decode())
131        self.assertEqual(body["error"], "invalid_grant")

test invalid JWT

def test_invalid_expired(self):
133    def test_invalid_expired(self):
134        """test invalid JWT"""
135        token = self.helper_provider.encode(
136            {
137                "sub": "foo",
138                "exp": datetime.now() - timedelta(hours=2),
139            }
140        )
141        response = self.client.post(
142            reverse("authentik_providers_oauth2:token"),
143            {
144                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
145                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
146                "client_id": self.provider.client_id,
147                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
148                "client_assertion": token,
149            },
150        )
151        self.assertEqual(response.status_code, 400)
152        body = loads(response.content.decode())
153        self.assertEqual(body["error"], "invalid_grant")

test invalid JWT

def test_invalid_no_app(self):
155    def test_invalid_no_app(self):
156        """test invalid JWT"""
157        self.app.provider = None
158        self.app.save()
159        token = self.helper_provider.encode(
160            {
161                "sub": "foo",
162                "exp": datetime.now() + timedelta(hours=2),
163            }
164        )
165        response = self.client.post(
166            reverse("authentik_providers_oauth2:token"),
167            {
168                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
169                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
170                "client_id": self.provider.client_id,
171                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
172                "client_assertion": token,
173            },
174        )
175        self.assertEqual(response.status_code, 400)
176        body = loads(response.content.decode())
177        self.assertEqual(body["error"], "invalid_grant")

test invalid JWT

def test_invalid_access_denied(self):
179    def test_invalid_access_denied(self):
180        """test invalid JWT"""
181        group = Group.objects.create(name="foo")
182        PolicyBinding.objects.create(
183            group=group,
184            target=self.app,
185            order=0,
186        )
187        token = self.helper_provider.encode(
188            {
189                "sub": "foo",
190                "exp": datetime.now() + timedelta(hours=2),
191            }
192        )
193        response = self.client.post(
194            reverse("authentik_providers_oauth2:token"),
195            {
196                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
197                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
198                "client_id": self.provider.client_id,
199                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
200                "client_assertion": token,
201            },
202        )
203        self.assertEqual(response.status_code, 400)
204        body = loads(response.content.decode())
205        self.assertEqual(body["error"], "invalid_grant")

test invalid JWT

def test_successful(self):
207    def test_successful(self):
208        """test successful"""
209        token = self.helper_provider.encode(
210            {
211                "sub": "foo",
212                "exp": datetime.now() + timedelta(hours=2),
213            }
214        )
215        response = self.client.post(
216            reverse("authentik_providers_oauth2:token"),
217            {
218                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
219                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
220                "client_id": self.provider.client_id,
221                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
222                "client_assertion": token,
223            },
224        )
225        self.assertEqual(response.status_code, 200)
226
227        user = User.objects.filter(username=f"{self.provider.name}-foo").first()
228        self.assertIsNotNone(user)
229
230        body = loads(response.content.decode())
231        self.assertEqual(body["token_type"], TOKEN_TYPE)
232        _, alg = self.provider.jwt_key
233        jwt = decode(
234            body["access_token"],
235            key=self.provider.signing_key.public_key,
236            algorithms=[alg],
237            audience=self.provider.client_id,
238        )
239        self.assertEqual(
240            jwt["given_name"], "Autogenerated user from application test (client credentials JWT)"
241        )
242        self.assertEqual(jwt["preferred_username"], "test-foo")

test successful

def test_successful_mapping(self):
244    def test_successful_mapping(self):
245        """test successful"""
246        test_username = ("mapped-foo" + ("a" * 150))[:USERNAME_MAX_LENGTH]
247        mapping = OAuthSourcePropertyMapping.objects.create(
248            name="test-mapping",
249            expression="""return {
250                "email": oauth_userinfo.get("email"),
251                "name": oauth_userinfo.get("name"),
252                "username": oauth_userinfo.get("username"),
253            }""",
254        )
255        self.source.user_property_mappings.add(mapping)
256
257        token = self.helper_provider.encode(
258            {
259                "sub": "foo",
260                "email": "test-user@example.com",
261                "name": "Mapped Test User",
262                "username": "mapped-foo" + ("a" * 150),
263                "exp": datetime.now() + timedelta(hours=2),
264            }
265        )
266        response = self.client.post(
267            reverse("authentik_providers_oauth2:token"),
268            {
269                "grant_type": GRANT_TYPE_CLIENT_CREDENTIALS,
270                "scope": f"{SCOPE_OPENID} {SCOPE_OPENID_EMAIL} {SCOPE_OPENID_PROFILE}",
271                "client_id": self.provider.client_id,
272                "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
273                "client_assertion": token,
274            },
275        )
276        self.assertEqual(response.status_code, 200)
277
278        user = User.objects.filter(username=test_username).first()
279        self.assertIsNotNone(user)
280
281        body = loads(response.content.decode())
282        self.assertEqual(body["token_type"], TOKEN_TYPE)
283        key_obj, alg = self.provider.jwt_key
284        jwt = decode(
285            body["access_token"],
286            key=key_obj.public_key(),
287            algorithms=[alg],
288            audience=self.provider.client_id,
289        )
290
291        self.assertEqual(jwt["email"], "test-user@example.com")
292        self.assertEqual(jwt["given_name"], "Mapped Test User")
293        self.assertEqual(jwt["preferred_username"], test_username)

test successful