authentik.providers.oauth2.tests.test_token

Test token view

  1"""Test token view"""
  2
  3from base64 import b64encode
  4from json import dumps
  5from urllib.parse import quote
  6
  7from django.test import RequestFactory
  8from django.urls import reverse
  9from django.utils import timezone
 10
 11from authentik.blueprints.tests import apply_blueprint
 12from authentik.common.oauth.constants import (
 13    GRANT_TYPE_AUTHORIZATION_CODE,
 14    GRANT_TYPE_REFRESH_TOKEN,
 15    TOKEN_TYPE,
 16)
 17from authentik.core.models import Application
 18from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 19from authentik.events.models import Event, EventAction
 20from authentik.lib.generators import generate_id, generate_key
 21from authentik.providers.oauth2.errors import TokenError
 22from authentik.providers.oauth2.models import (
 23    AccessToken,
 24    AuthorizationCode,
 25    OAuth2Provider,
 26    RedirectURI,
 27    RedirectURIMatchingMode,
 28    RefreshToken,
 29    ScopeMapping,
 30)
 31from authentik.providers.oauth2.tests.utils import OAuthTestCase
 32from authentik.providers.oauth2.utils import extract_client_auth
 33from authentik.providers.oauth2.views.token import TokenParams
 34
 35
 36class TestToken(OAuthTestCase):
 37    """Test token view"""
 38
 39    def setUp(self) -> None:
 40        super().setUp()
 41        self.factory = RequestFactory()
 42        self.app = Application.objects.create(name=generate_id(), slug="test")
 43
 44    def test_request_auth_code(self):
 45        """test request param"""
 46        provider = OAuth2Provider.objects.create(
 47            name=generate_id(),
 48            authorization_flow=create_test_flow(),
 49            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")],
 50            signing_key=self.keypair,
 51        )
 52        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 53        user = create_test_admin_user()
 54        code = AuthorizationCode.objects.create(
 55            code="foobar", provider=provider, user=user, auth_time=timezone.now()
 56        )
 57        request = self.factory.post(
 58            "/",
 59            data={
 60                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 61                "code": code.code,
 62                "redirect_uri": "http://TestServer",
 63            },
 64            HTTP_AUTHORIZATION=f"Basic {header}",
 65        )
 66        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
 67        self.assertEqual(params.provider, provider)
 68        with self.assertRaises(TokenError):
 69            TokenParams.parse(request, provider, provider.client_id, generate_key())
 70
 71    def test_request_auth_code_invalid(self):
 72        """test request param"""
 73        provider = OAuth2Provider.objects.create(
 74            name=generate_id(),
 75            authorization_flow=create_test_flow(),
 76            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
 77            signing_key=self.keypair,
 78        )
 79        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 80        request = self.factory.post(
 81            "/",
 82            data={
 83                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 84                "code": "foo",
 85                "redirect_uri": "http://testserver",
 86            },
 87            HTTP_AUTHORIZATION=f"Basic {header}",
 88        )
 89        with self.assertRaises(TokenError):
 90            TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
 91
 92    def test_request_refresh_token(self):
 93        """test request param"""
 94        provider = OAuth2Provider.objects.create(
 95            name=generate_id(),
 96            authorization_flow=create_test_flow(),
 97            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
 98            signing_key=self.keypair,
 99        )
100        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
101        user = create_test_admin_user()
102        token: RefreshToken = RefreshToken.objects.create(
103            provider=provider,
104            user=user,
105            token=generate_id(),
106            auth_time=timezone.now(),
107        )
108        request = self.factory.post(
109            "/",
110            data={
111                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
112                "refresh_token": token.token,
113                "redirect_uri": "http://local.invalid",
114            },
115            HTTP_AUTHORIZATION=f"Basic {header}",
116        )
117        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
118        self.assertEqual(params.provider, provider)
119
120    def test_extract_client_auth_basic_auth_percent_decodes(self):
121        """test percent-decoding of client credentials in Basic auth"""
122        header = b64encode(
123            f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode()
124        ).decode()
125        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
126        self.assertEqual(extract_client_auth(request), ("client/id", "secret+/=="))
127
128    def test_extract_client_auth_basic_auth_preserves_raw_plus(self):
129        """test compatibility with clients that still send raw plus characters"""
130        header = b64encode(b"client:secret+plus").decode()
131        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
132        self.assertEqual(extract_client_auth(request), ("client", "secret+plus"))
133
134    def test_auth_code_view(self):
135        """test request param"""
136        provider = OAuth2Provider.objects.create(
137            name=generate_id(),
138            authorization_flow=create_test_flow(),
139            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
140            signing_key=self.keypair,
141        )
142        # Needs to be assigned to an application for iss to be set
143        self.app.provider = provider
144        self.app.save()
145        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
146        user = create_test_admin_user()
147        code = AuthorizationCode.objects.create(
148            code="foobar", provider=provider, user=user, auth_time=timezone.now()
149        )
150        response = self.client.post(
151            reverse("authentik_providers_oauth2:token"),
152            data={
153                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
154                "code": code.code,
155                "redirect_uri": "http://local.invalid",
156            },
157            HTTP_AUTHORIZATION=f"Basic {header}",
158        )
159        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
160        self.assertJSONEqual(
161            response.content.decode(),
162            {
163                "access_token": access.token,
164                "token_type": TOKEN_TYPE,
165                "expires_in": 3600,
166                "id_token": provider.encode(
167                    access.id_token.to_dict(),
168                ),
169                "scope": "",
170            },
171        )
172        self.validate_jwt(access, provider)
173
174    def test_auth_code_enc(self):
175        """test request param"""
176        provider = OAuth2Provider.objects.create(
177            name=generate_id(),
178            authorization_flow=create_test_flow(),
179            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
180            signing_key=self.keypair,
181            encryption_key=self.keypair,
182        )
183        # Needs to be assigned to an application for iss to be set
184        self.app.provider = provider
185        self.app.save()
186        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
187        user = create_test_admin_user()
188        code = AuthorizationCode.objects.create(
189            code="foobar", provider=provider, user=user, auth_time=timezone.now()
190        )
191        response = self.client.post(
192            reverse("authentik_providers_oauth2:token"),
193            data={
194                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
195                "code": code.code,
196                "redirect_uri": "http://local.invalid",
197            },
198            HTTP_AUTHORIZATION=f"Basic {header}",
199        )
200        self.assertEqual(response.status_code, 200)
201        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
202        self.validate_jwe(access, provider)
203
204    @apply_blueprint("system/providers-oauth2.yaml")
205    def test_refresh_token_view(self):
206        """test request param"""
207        provider = OAuth2Provider.objects.create(
208            name=generate_id(),
209            authorization_flow=create_test_flow(),
210            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
211            signing_key=self.keypair,
212        )
213        provider.property_mappings.set(
214            ScopeMapping.objects.filter(
215                managed__in=[
216                    "goauthentik.io/providers/oauth2/scope-openid",
217                    "goauthentik.io/providers/oauth2/scope-email",
218                    "goauthentik.io/providers/oauth2/scope-profile",
219                    "goauthentik.io/providers/oauth2/scope-offline_access",
220                ]
221            )
222        )
223        # Needs to be assigned to an application for iss to be set
224        self.app.provider = provider
225        self.app.save()
226        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
227        user = create_test_admin_user()
228        token: RefreshToken = RefreshToken.objects.create(
229            provider=provider,
230            user=user,
231            token=generate_id(),
232            _id_token=dumps({}),
233            auth_time=timezone.now(),
234            _scope="offline_access",
235        )
236        response = self.client.post(
237            reverse("authentik_providers_oauth2:token"),
238            data={
239                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
240                "refresh_token": token.token,
241                "redirect_uri": "http://local.invalid",
242            },
243            HTTP_AUTHORIZATION=f"Basic {header}",
244            HTTP_ORIGIN="http://local.invalid",
245        )
246        self.assertEqual(response["Access-Control-Allow-Credentials"], "true")
247        self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid")
248        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
249        refresh: RefreshToken = RefreshToken.objects.filter(
250            user=user, provider=provider, revoked=False
251        ).first()
252        self.assertJSONEqual(
253            response.content.decode(),
254            {
255                "access_token": access.token,
256                "refresh_token": refresh.token,
257                "token_type": TOKEN_TYPE,
258                "expires_in": 3600,
259                "id_token": provider.encode(
260                    access.id_token.to_dict(),
261                ),
262                "scope": "offline_access",
263            },
264        )
265        self.validate_jwt(access, provider)
266
267    @apply_blueprint("system/providers-oauth2.yaml")
268    def test_refresh_token_view_invalid_origin(self):
269        """test request param"""
270        provider = OAuth2Provider.objects.create(
271            name=generate_id(),
272            authorization_flow=create_test_flow(),
273            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
274            signing_key=self.keypair,
275        )
276        provider.property_mappings.set(
277            ScopeMapping.objects.filter(
278                managed__in=[
279                    "goauthentik.io/providers/oauth2/scope-openid",
280                    "goauthentik.io/providers/oauth2/scope-email",
281                    "goauthentik.io/providers/oauth2/scope-profile",
282                    "goauthentik.io/providers/oauth2/scope-offline_access",
283                ]
284            )
285        )
286        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
287        user = create_test_admin_user()
288        token: RefreshToken = RefreshToken.objects.create(
289            provider=provider,
290            user=user,
291            token=generate_id(),
292            _id_token=dumps({}),
293            auth_time=timezone.now(),
294            _scope="offline_access",
295        )
296        response = self.client.post(
297            reverse("authentik_providers_oauth2:token"),
298            data={
299                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
300                "refresh_token": token.token,
301                "redirect_uri": "http://local.invalid",
302            },
303            HTTP_AUTHORIZATION=f"Basic {header}",
304            HTTP_ORIGIN="http://another.invalid",
305        )
306        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
307        refresh: RefreshToken = RefreshToken.objects.filter(
308            user=user, provider=provider, revoked=False
309        ).first()
310        self.assertNotIn("Access-Control-Allow-Credentials", response)
311        self.assertNotIn("Access-Control-Allow-Origin", response)
312        self.assertJSONEqual(
313            response.content.decode(),
314            {
315                "access_token": access.token,
316                "refresh_token": refresh.token,
317                "token_type": TOKEN_TYPE,
318                "expires_in": 3600,
319                "id_token": provider.encode(
320                    access.id_token.to_dict(),
321                ),
322                "scope": "offline_access",
323            },
324        )
325
326    @apply_blueprint("system/providers-oauth2.yaml")
327    def test_refresh_token_revoke(self):
328        """test request param"""
329        provider = OAuth2Provider.objects.create(
330            name=generate_id(),
331            authorization_flow=create_test_flow(),
332            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
333            signing_key=self.keypair,
334        )
335        provider.property_mappings.set(
336            ScopeMapping.objects.filter(
337                managed__in=[
338                    "goauthentik.io/providers/oauth2/scope-openid",
339                    "goauthentik.io/providers/oauth2/scope-email",
340                    "goauthentik.io/providers/oauth2/scope-profile",
341                    "goauthentik.io/providers/oauth2/scope-offline_access",
342                ]
343            )
344        )
345        # Needs to be assigned to an application for iss to be set
346        self.app.provider = provider
347        self.app.save()
348        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
349        user = create_test_admin_user()
350        token: RefreshToken = RefreshToken.objects.create(
351            provider=provider,
352            user=user,
353            token=generate_id(),
354            _id_token=dumps({}),
355            auth_time=timezone.now(),
356            _scope="offline_access",
357        )
358        # Create initial refresh token
359        response = self.client.post(
360            reverse("authentik_providers_oauth2:token"),
361            data={
362                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
363                "refresh_token": token.token,
364                "redirect_uri": "http://testserver",
365            },
366            HTTP_AUTHORIZATION=f"Basic {header}",
367        )
368        new_token: RefreshToken = (
369            RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first()
370        )
371        # Post again with initial token -> get new refresh token
372        # and revoke old one
373        response = self.client.post(
374            reverse("authentik_providers_oauth2:token"),
375            data={
376                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
377                "refresh_token": new_token.token,
378                "redirect_uri": "http://local.invalid",
379            },
380            HTTP_AUTHORIZATION=f"Basic {header}",
381        )
382        self.assertEqual(response.status_code, 200)
383        # Post again with old token, is now revoked and should error
384        response = self.client.post(
385            reverse("authentik_providers_oauth2:token"),
386            data={
387                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
388                "refresh_token": new_token.token,
389                "redirect_uri": "http://local.invalid",
390            },
391            HTTP_AUTHORIZATION=f"Basic {header}",
392        )
393        self.assertEqual(response.status_code, 400)
394        self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists())
395
396    @apply_blueprint("system/providers-oauth2.yaml")
397    def test_refresh_token_view_threshold(self):
398        """test request param"""
399        provider = OAuth2Provider.objects.create(
400            name=generate_id(),
401            authorization_flow=create_test_flow(),
402            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
403            signing_key=self.keypair,
404            refresh_token_threshold="hours=1",  # nosec
405        )
406        provider.property_mappings.set(
407            ScopeMapping.objects.filter(
408                managed__in=[
409                    "goauthentik.io/providers/oauth2/scope-openid",
410                    "goauthentik.io/providers/oauth2/scope-email",
411                    "goauthentik.io/providers/oauth2/scope-profile",
412                    "goauthentik.io/providers/oauth2/scope-offline_access",
413                ]
414            )
415        )
416        # Needs to be assigned to an application for iss to be set
417        self.app.provider = provider
418        self.app.save()
419        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
420        user = create_test_admin_user()
421        token: RefreshToken = RefreshToken.objects.create(
422            provider=provider,
423            user=user,
424            token=generate_id(),
425            _id_token=dumps({}),
426            auth_time=timezone.now(),
427            _scope="offline_access",
428        )
429        response = self.client.post(
430            reverse("authentik_providers_oauth2:token"),
431            data={
432                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
433                "refresh_token": token.token,
434                "redirect_uri": "http://local.invalid",
435            },
436            HTTP_AUTHORIZATION=f"Basic {header}",
437            HTTP_ORIGIN="http://local.invalid",
438        )
439        self.assertEqual(response["Access-Control-Allow-Credentials"], "true")
440        self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid")
441        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
442        self.assertJSONEqual(
443            response.content.decode(),
444            {
445                "access_token": access.token,
446                "token_type": TOKEN_TYPE,
447                "expires_in": 3600,
448                "id_token": provider.encode(
449                    access.id_token.to_dict(),
450                ),
451                "scope": "offline_access",
452            },
453        )
454        self.validate_jwt(access, provider)
455
456    @apply_blueprint("system/providers-oauth2.yaml")
457    def test_scope_claim_override_via_property_mapping(self):
458        """Test that property mappings can override the scope claim in access tokens.
459
460        See: https://github.com/goauthentik/authentik/issues/19224
461        """
462        # Create a custom scope mapping that returns a custom scope claim
463        custom_scope_mapping = ScopeMapping.objects.create(
464            name="custom-scope-override",
465            scope_name="custom",
466            expression='return {"scope": "custom-scope-value additional-scope"}',
467        )
468
469        provider = OAuth2Provider.objects.create(
470            name=generate_id(),
471            authorization_flow=create_test_flow(),
472            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
473            signing_key=self.keypair,
474            include_claims_in_id_token=True,
475        )
476        provider.property_mappings.add(custom_scope_mapping)
477
478        # Needs to be assigned to an application for iss to be set
479        self.app.provider = provider
480        self.app.save()
481
482        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
483        user = create_test_admin_user()
484        code = AuthorizationCode.objects.create(
485            code="foobar",
486            provider=provider,
487            user=user,
488            auth_time=timezone.now(),
489            _scope="openid custom",  # Request the custom scope
490        )
491
492        response = self.client.post(
493            reverse("authentik_providers_oauth2:token"),
494            data={
495                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
496                "code": code.code,
497                "redirect_uri": "http://local.invalid",
498            },
499            HTTP_AUTHORIZATION=f"Basic {header}",
500        )
501        self.assertEqual(response.status_code, 200)
502
503        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
504        jwt_data = self.validate_jwt(access, provider)
505
506        # The scope should be the custom value from the property mapping,
507        # not the default "openid custom"
508        self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")
 37class TestToken(OAuthTestCase):
 38    """Test token view"""
 39
 40    def setUp(self) -> None:
 41        super().setUp()
 42        self.factory = RequestFactory()
 43        self.app = Application.objects.create(name=generate_id(), slug="test")
 44
 45    def test_request_auth_code(self):
 46        """test request param"""
 47        provider = OAuth2Provider.objects.create(
 48            name=generate_id(),
 49            authorization_flow=create_test_flow(),
 50            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")],
 51            signing_key=self.keypair,
 52        )
 53        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 54        user = create_test_admin_user()
 55        code = AuthorizationCode.objects.create(
 56            code="foobar", provider=provider, user=user, auth_time=timezone.now()
 57        )
 58        request = self.factory.post(
 59            "/",
 60            data={
 61                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 62                "code": code.code,
 63                "redirect_uri": "http://TestServer",
 64            },
 65            HTTP_AUTHORIZATION=f"Basic {header}",
 66        )
 67        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
 68        self.assertEqual(params.provider, provider)
 69        with self.assertRaises(TokenError):
 70            TokenParams.parse(request, provider, provider.client_id, generate_key())
 71
 72    def test_request_auth_code_invalid(self):
 73        """test request param"""
 74        provider = OAuth2Provider.objects.create(
 75            name=generate_id(),
 76            authorization_flow=create_test_flow(),
 77            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
 78            signing_key=self.keypair,
 79        )
 80        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 81        request = self.factory.post(
 82            "/",
 83            data={
 84                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 85                "code": "foo",
 86                "redirect_uri": "http://testserver",
 87            },
 88            HTTP_AUTHORIZATION=f"Basic {header}",
 89        )
 90        with self.assertRaises(TokenError):
 91            TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
 92
 93    def test_request_refresh_token(self):
 94        """test request param"""
 95        provider = OAuth2Provider.objects.create(
 96            name=generate_id(),
 97            authorization_flow=create_test_flow(),
 98            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
 99            signing_key=self.keypair,
100        )
101        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
102        user = create_test_admin_user()
103        token: RefreshToken = RefreshToken.objects.create(
104            provider=provider,
105            user=user,
106            token=generate_id(),
107            auth_time=timezone.now(),
108        )
109        request = self.factory.post(
110            "/",
111            data={
112                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
113                "refresh_token": token.token,
114                "redirect_uri": "http://local.invalid",
115            },
116            HTTP_AUTHORIZATION=f"Basic {header}",
117        )
118        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
119        self.assertEqual(params.provider, provider)
120
121    def test_extract_client_auth_basic_auth_percent_decodes(self):
122        """test percent-decoding of client credentials in Basic auth"""
123        header = b64encode(
124            f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode()
125        ).decode()
126        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
127        self.assertEqual(extract_client_auth(request), ("client/id", "secret+/=="))
128
129    def test_extract_client_auth_basic_auth_preserves_raw_plus(self):
130        """test compatibility with clients that still send raw plus characters"""
131        header = b64encode(b"client:secret+plus").decode()
132        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
133        self.assertEqual(extract_client_auth(request), ("client", "secret+plus"))
134
135    def test_auth_code_view(self):
136        """test request param"""
137        provider = OAuth2Provider.objects.create(
138            name=generate_id(),
139            authorization_flow=create_test_flow(),
140            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
141            signing_key=self.keypair,
142        )
143        # Needs to be assigned to an application for iss to be set
144        self.app.provider = provider
145        self.app.save()
146        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
147        user = create_test_admin_user()
148        code = AuthorizationCode.objects.create(
149            code="foobar", provider=provider, user=user, auth_time=timezone.now()
150        )
151        response = self.client.post(
152            reverse("authentik_providers_oauth2:token"),
153            data={
154                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
155                "code": code.code,
156                "redirect_uri": "http://local.invalid",
157            },
158            HTTP_AUTHORIZATION=f"Basic {header}",
159        )
160        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
161        self.assertJSONEqual(
162            response.content.decode(),
163            {
164                "access_token": access.token,
165                "token_type": TOKEN_TYPE,
166                "expires_in": 3600,
167                "id_token": provider.encode(
168                    access.id_token.to_dict(),
169                ),
170                "scope": "",
171            },
172        )
173        self.validate_jwt(access, provider)
174
175    def test_auth_code_enc(self):
176        """test request param"""
177        provider = OAuth2Provider.objects.create(
178            name=generate_id(),
179            authorization_flow=create_test_flow(),
180            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
181            signing_key=self.keypair,
182            encryption_key=self.keypair,
183        )
184        # Needs to be assigned to an application for iss to be set
185        self.app.provider = provider
186        self.app.save()
187        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
188        user = create_test_admin_user()
189        code = AuthorizationCode.objects.create(
190            code="foobar", provider=provider, user=user, auth_time=timezone.now()
191        )
192        response = self.client.post(
193            reverse("authentik_providers_oauth2:token"),
194            data={
195                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
196                "code": code.code,
197                "redirect_uri": "http://local.invalid",
198            },
199            HTTP_AUTHORIZATION=f"Basic {header}",
200        )
201        self.assertEqual(response.status_code, 200)
202        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
203        self.validate_jwe(access, provider)
204
205    @apply_blueprint("system/providers-oauth2.yaml")
206    def test_refresh_token_view(self):
207        """test request param"""
208        provider = OAuth2Provider.objects.create(
209            name=generate_id(),
210            authorization_flow=create_test_flow(),
211            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
212            signing_key=self.keypair,
213        )
214        provider.property_mappings.set(
215            ScopeMapping.objects.filter(
216                managed__in=[
217                    "goauthentik.io/providers/oauth2/scope-openid",
218                    "goauthentik.io/providers/oauth2/scope-email",
219                    "goauthentik.io/providers/oauth2/scope-profile",
220                    "goauthentik.io/providers/oauth2/scope-offline_access",
221                ]
222            )
223        )
224        # Needs to be assigned to an application for iss to be set
225        self.app.provider = provider
226        self.app.save()
227        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
228        user = create_test_admin_user()
229        token: RefreshToken = RefreshToken.objects.create(
230            provider=provider,
231            user=user,
232            token=generate_id(),
233            _id_token=dumps({}),
234            auth_time=timezone.now(),
235            _scope="offline_access",
236        )
237        response = self.client.post(
238            reverse("authentik_providers_oauth2:token"),
239            data={
240                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
241                "refresh_token": token.token,
242                "redirect_uri": "http://local.invalid",
243            },
244            HTTP_AUTHORIZATION=f"Basic {header}",
245            HTTP_ORIGIN="http://local.invalid",
246        )
247        self.assertEqual(response["Access-Control-Allow-Credentials"], "true")
248        self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid")
249        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
250        refresh: RefreshToken = RefreshToken.objects.filter(
251            user=user, provider=provider, revoked=False
252        ).first()
253        self.assertJSONEqual(
254            response.content.decode(),
255            {
256                "access_token": access.token,
257                "refresh_token": refresh.token,
258                "token_type": TOKEN_TYPE,
259                "expires_in": 3600,
260                "id_token": provider.encode(
261                    access.id_token.to_dict(),
262                ),
263                "scope": "offline_access",
264            },
265        )
266        self.validate_jwt(access, provider)
267
268    @apply_blueprint("system/providers-oauth2.yaml")
269    def test_refresh_token_view_invalid_origin(self):
270        """test request param"""
271        provider = OAuth2Provider.objects.create(
272            name=generate_id(),
273            authorization_flow=create_test_flow(),
274            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
275            signing_key=self.keypair,
276        )
277        provider.property_mappings.set(
278            ScopeMapping.objects.filter(
279                managed__in=[
280                    "goauthentik.io/providers/oauth2/scope-openid",
281                    "goauthentik.io/providers/oauth2/scope-email",
282                    "goauthentik.io/providers/oauth2/scope-profile",
283                    "goauthentik.io/providers/oauth2/scope-offline_access",
284                ]
285            )
286        )
287        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
288        user = create_test_admin_user()
289        token: RefreshToken = RefreshToken.objects.create(
290            provider=provider,
291            user=user,
292            token=generate_id(),
293            _id_token=dumps({}),
294            auth_time=timezone.now(),
295            _scope="offline_access",
296        )
297        response = self.client.post(
298            reverse("authentik_providers_oauth2:token"),
299            data={
300                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
301                "refresh_token": token.token,
302                "redirect_uri": "http://local.invalid",
303            },
304            HTTP_AUTHORIZATION=f"Basic {header}",
305            HTTP_ORIGIN="http://another.invalid",
306        )
307        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
308        refresh: RefreshToken = RefreshToken.objects.filter(
309            user=user, provider=provider, revoked=False
310        ).first()
311        self.assertNotIn("Access-Control-Allow-Credentials", response)
312        self.assertNotIn("Access-Control-Allow-Origin", response)
313        self.assertJSONEqual(
314            response.content.decode(),
315            {
316                "access_token": access.token,
317                "refresh_token": refresh.token,
318                "token_type": TOKEN_TYPE,
319                "expires_in": 3600,
320                "id_token": provider.encode(
321                    access.id_token.to_dict(),
322                ),
323                "scope": "offline_access",
324            },
325        )
326
327    @apply_blueprint("system/providers-oauth2.yaml")
328    def test_refresh_token_revoke(self):
329        """test request param"""
330        provider = OAuth2Provider.objects.create(
331            name=generate_id(),
332            authorization_flow=create_test_flow(),
333            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
334            signing_key=self.keypair,
335        )
336        provider.property_mappings.set(
337            ScopeMapping.objects.filter(
338                managed__in=[
339                    "goauthentik.io/providers/oauth2/scope-openid",
340                    "goauthentik.io/providers/oauth2/scope-email",
341                    "goauthentik.io/providers/oauth2/scope-profile",
342                    "goauthentik.io/providers/oauth2/scope-offline_access",
343                ]
344            )
345        )
346        # Needs to be assigned to an application for iss to be set
347        self.app.provider = provider
348        self.app.save()
349        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
350        user = create_test_admin_user()
351        token: RefreshToken = RefreshToken.objects.create(
352            provider=provider,
353            user=user,
354            token=generate_id(),
355            _id_token=dumps({}),
356            auth_time=timezone.now(),
357            _scope="offline_access",
358        )
359        # Create initial refresh token
360        response = self.client.post(
361            reverse("authentik_providers_oauth2:token"),
362            data={
363                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
364                "refresh_token": token.token,
365                "redirect_uri": "http://testserver",
366            },
367            HTTP_AUTHORIZATION=f"Basic {header}",
368        )
369        new_token: RefreshToken = (
370            RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first()
371        )
372        # Post again with initial token -> get new refresh token
373        # and revoke old one
374        response = self.client.post(
375            reverse("authentik_providers_oauth2:token"),
376            data={
377                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
378                "refresh_token": new_token.token,
379                "redirect_uri": "http://local.invalid",
380            },
381            HTTP_AUTHORIZATION=f"Basic {header}",
382        )
383        self.assertEqual(response.status_code, 200)
384        # Post again with old token, is now revoked and should error
385        response = self.client.post(
386            reverse("authentik_providers_oauth2:token"),
387            data={
388                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
389                "refresh_token": new_token.token,
390                "redirect_uri": "http://local.invalid",
391            },
392            HTTP_AUTHORIZATION=f"Basic {header}",
393        )
394        self.assertEqual(response.status_code, 400)
395        self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists())
396
397    @apply_blueprint("system/providers-oauth2.yaml")
398    def test_refresh_token_view_threshold(self):
399        """test request param"""
400        provider = OAuth2Provider.objects.create(
401            name=generate_id(),
402            authorization_flow=create_test_flow(),
403            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
404            signing_key=self.keypair,
405            refresh_token_threshold="hours=1",  # nosec
406        )
407        provider.property_mappings.set(
408            ScopeMapping.objects.filter(
409                managed__in=[
410                    "goauthentik.io/providers/oauth2/scope-openid",
411                    "goauthentik.io/providers/oauth2/scope-email",
412                    "goauthentik.io/providers/oauth2/scope-profile",
413                    "goauthentik.io/providers/oauth2/scope-offline_access",
414                ]
415            )
416        )
417        # Needs to be assigned to an application for iss to be set
418        self.app.provider = provider
419        self.app.save()
420        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
421        user = create_test_admin_user()
422        token: RefreshToken = RefreshToken.objects.create(
423            provider=provider,
424            user=user,
425            token=generate_id(),
426            _id_token=dumps({}),
427            auth_time=timezone.now(),
428            _scope="offline_access",
429        )
430        response = self.client.post(
431            reverse("authentik_providers_oauth2:token"),
432            data={
433                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
434                "refresh_token": token.token,
435                "redirect_uri": "http://local.invalid",
436            },
437            HTTP_AUTHORIZATION=f"Basic {header}",
438            HTTP_ORIGIN="http://local.invalid",
439        )
440        self.assertEqual(response["Access-Control-Allow-Credentials"], "true")
441        self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid")
442        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
443        self.assertJSONEqual(
444            response.content.decode(),
445            {
446                "access_token": access.token,
447                "token_type": TOKEN_TYPE,
448                "expires_in": 3600,
449                "id_token": provider.encode(
450                    access.id_token.to_dict(),
451                ),
452                "scope": "offline_access",
453            },
454        )
455        self.validate_jwt(access, provider)
456
457    @apply_blueprint("system/providers-oauth2.yaml")
458    def test_scope_claim_override_via_property_mapping(self):
459        """Test that property mappings can override the scope claim in access tokens.
460
461        See: https://github.com/goauthentik/authentik/issues/19224
462        """
463        # Create a custom scope mapping that returns a custom scope claim
464        custom_scope_mapping = ScopeMapping.objects.create(
465            name="custom-scope-override",
466            scope_name="custom",
467            expression='return {"scope": "custom-scope-value additional-scope"}',
468        )
469
470        provider = OAuth2Provider.objects.create(
471            name=generate_id(),
472            authorization_flow=create_test_flow(),
473            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
474            signing_key=self.keypair,
475            include_claims_in_id_token=True,
476        )
477        provider.property_mappings.add(custom_scope_mapping)
478
479        # Needs to be assigned to an application for iss to be set
480        self.app.provider = provider
481        self.app.save()
482
483        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
484        user = create_test_admin_user()
485        code = AuthorizationCode.objects.create(
486            code="foobar",
487            provider=provider,
488            user=user,
489            auth_time=timezone.now(),
490            _scope="openid custom",  # Request the custom scope
491        )
492
493        response = self.client.post(
494            reverse("authentik_providers_oauth2:token"),
495            data={
496                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
497                "code": code.code,
498                "redirect_uri": "http://local.invalid",
499            },
500            HTTP_AUTHORIZATION=f"Basic {header}",
501        )
502        self.assertEqual(response.status_code, 200)
503
504        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
505        jwt_data = self.validate_jwt(access, provider)
506
507        # The scope should be the custom value from the property mapping,
508        # not the default "openid custom"
509        self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")

Test token view

def setUp(self) -> None:
40    def setUp(self) -> None:
41        super().setUp()
42        self.factory = RequestFactory()
43        self.app = Application.objects.create(name=generate_id(), slug="test")

Hook method for setting up the test fixture before exercising it.

def test_request_auth_code(self):
45    def test_request_auth_code(self):
46        """test request param"""
47        provider = OAuth2Provider.objects.create(
48            name=generate_id(),
49            authorization_flow=create_test_flow(),
50            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")],
51            signing_key=self.keypair,
52        )
53        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
54        user = create_test_admin_user()
55        code = AuthorizationCode.objects.create(
56            code="foobar", provider=provider, user=user, auth_time=timezone.now()
57        )
58        request = self.factory.post(
59            "/",
60            data={
61                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
62                "code": code.code,
63                "redirect_uri": "http://TestServer",
64            },
65            HTTP_AUTHORIZATION=f"Basic {header}",
66        )
67        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
68        self.assertEqual(params.provider, provider)
69        with self.assertRaises(TokenError):
70            TokenParams.parse(request, provider, provider.client_id, generate_key())

test request param

def test_request_auth_code_invalid(self):
72    def test_request_auth_code_invalid(self):
73        """test request param"""
74        provider = OAuth2Provider.objects.create(
75            name=generate_id(),
76            authorization_flow=create_test_flow(),
77            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
78            signing_key=self.keypair,
79        )
80        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
81        request = self.factory.post(
82            "/",
83            data={
84                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
85                "code": "foo",
86                "redirect_uri": "http://testserver",
87            },
88            HTTP_AUTHORIZATION=f"Basic {header}",
89        )
90        with self.assertRaises(TokenError):
91            TokenParams.parse(request, provider, provider.client_id, provider.client_secret)

test request param

def test_request_refresh_token(self):
 93    def test_request_refresh_token(self):
 94        """test request param"""
 95        provider = OAuth2Provider.objects.create(
 96            name=generate_id(),
 97            authorization_flow=create_test_flow(),
 98            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
 99            signing_key=self.keypair,
100        )
101        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
102        user = create_test_admin_user()
103        token: RefreshToken = RefreshToken.objects.create(
104            provider=provider,
105            user=user,
106            token=generate_id(),
107            auth_time=timezone.now(),
108        )
109        request = self.factory.post(
110            "/",
111            data={
112                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
113                "refresh_token": token.token,
114                "redirect_uri": "http://local.invalid",
115            },
116            HTTP_AUTHORIZATION=f"Basic {header}",
117        )
118        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
119        self.assertEqual(params.provider, provider)

test request param

def test_extract_client_auth_basic_auth_percent_decodes(self):
121    def test_extract_client_auth_basic_auth_percent_decodes(self):
122        """test percent-decoding of client credentials in Basic auth"""
123        header = b64encode(
124            f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode()
125        ).decode()
126        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
127        self.assertEqual(extract_client_auth(request), ("client/id", "secret+/=="))

test percent-decoding of client credentials in Basic auth

def test_extract_client_auth_basic_auth_preserves_raw_plus(self):
129    def test_extract_client_auth_basic_auth_preserves_raw_plus(self):
130        """test compatibility with clients that still send raw plus characters"""
131        header = b64encode(b"client:secret+plus").decode()
132        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
133        self.assertEqual(extract_client_auth(request), ("client", "secret+plus"))

test compatibility with clients that still send raw plus characters

def test_auth_code_view(self):
135    def test_auth_code_view(self):
136        """test request param"""
137        provider = OAuth2Provider.objects.create(
138            name=generate_id(),
139            authorization_flow=create_test_flow(),
140            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
141            signing_key=self.keypair,
142        )
143        # Needs to be assigned to an application for iss to be set
144        self.app.provider = provider
145        self.app.save()
146        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
147        user = create_test_admin_user()
148        code = AuthorizationCode.objects.create(
149            code="foobar", provider=provider, user=user, auth_time=timezone.now()
150        )
151        response = self.client.post(
152            reverse("authentik_providers_oauth2:token"),
153            data={
154                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
155                "code": code.code,
156                "redirect_uri": "http://local.invalid",
157            },
158            HTTP_AUTHORIZATION=f"Basic {header}",
159        )
160        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
161        self.assertJSONEqual(
162            response.content.decode(),
163            {
164                "access_token": access.token,
165                "token_type": TOKEN_TYPE,
166                "expires_in": 3600,
167                "id_token": provider.encode(
168                    access.id_token.to_dict(),
169                ),
170                "scope": "",
171            },
172        )
173        self.validate_jwt(access, provider)

test request param

def test_auth_code_enc(self):
175    def test_auth_code_enc(self):
176        """test request param"""
177        provider = OAuth2Provider.objects.create(
178            name=generate_id(),
179            authorization_flow=create_test_flow(),
180            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
181            signing_key=self.keypair,
182            encryption_key=self.keypair,
183        )
184        # Needs to be assigned to an application for iss to be set
185        self.app.provider = provider
186        self.app.save()
187        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
188        user = create_test_admin_user()
189        code = AuthorizationCode.objects.create(
190            code="foobar", provider=provider, user=user, auth_time=timezone.now()
191        )
192        response = self.client.post(
193            reverse("authentik_providers_oauth2:token"),
194            data={
195                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
196                "code": code.code,
197                "redirect_uri": "http://local.invalid",
198            },
199            HTTP_AUTHORIZATION=f"Basic {header}",
200        )
201        self.assertEqual(response.status_code, 200)
202        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
203        self.validate_jwe(access, provider)

test request param

@apply_blueprint('system/providers-oauth2.yaml')
def test_refresh_token_view(self):
205    @apply_blueprint("system/providers-oauth2.yaml")
206    def test_refresh_token_view(self):
207        """test request param"""
208        provider = OAuth2Provider.objects.create(
209            name=generate_id(),
210            authorization_flow=create_test_flow(),
211            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
212            signing_key=self.keypair,
213        )
214        provider.property_mappings.set(
215            ScopeMapping.objects.filter(
216                managed__in=[
217                    "goauthentik.io/providers/oauth2/scope-openid",
218                    "goauthentik.io/providers/oauth2/scope-email",
219                    "goauthentik.io/providers/oauth2/scope-profile",
220                    "goauthentik.io/providers/oauth2/scope-offline_access",
221                ]
222            )
223        )
224        # Needs to be assigned to an application for iss to be set
225        self.app.provider = provider
226        self.app.save()
227        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
228        user = create_test_admin_user()
229        token: RefreshToken = RefreshToken.objects.create(
230            provider=provider,
231            user=user,
232            token=generate_id(),
233            _id_token=dumps({}),
234            auth_time=timezone.now(),
235            _scope="offline_access",
236        )
237        response = self.client.post(
238            reverse("authentik_providers_oauth2:token"),
239            data={
240                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
241                "refresh_token": token.token,
242                "redirect_uri": "http://local.invalid",
243            },
244            HTTP_AUTHORIZATION=f"Basic {header}",
245            HTTP_ORIGIN="http://local.invalid",
246        )
247        self.assertEqual(response["Access-Control-Allow-Credentials"], "true")
248        self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid")
249        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
250        refresh: RefreshToken = RefreshToken.objects.filter(
251            user=user, provider=provider, revoked=False
252        ).first()
253        self.assertJSONEqual(
254            response.content.decode(),
255            {
256                "access_token": access.token,
257                "refresh_token": refresh.token,
258                "token_type": TOKEN_TYPE,
259                "expires_in": 3600,
260                "id_token": provider.encode(
261                    access.id_token.to_dict(),
262                ),
263                "scope": "offline_access",
264            },
265        )
266        self.validate_jwt(access, provider)

test request param

@apply_blueprint('system/providers-oauth2.yaml')
def test_refresh_token_view_invalid_origin(self):
268    @apply_blueprint("system/providers-oauth2.yaml")
269    def test_refresh_token_view_invalid_origin(self):
270        """test request param"""
271        provider = OAuth2Provider.objects.create(
272            name=generate_id(),
273            authorization_flow=create_test_flow(),
274            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
275            signing_key=self.keypair,
276        )
277        provider.property_mappings.set(
278            ScopeMapping.objects.filter(
279                managed__in=[
280                    "goauthentik.io/providers/oauth2/scope-openid",
281                    "goauthentik.io/providers/oauth2/scope-email",
282                    "goauthentik.io/providers/oauth2/scope-profile",
283                    "goauthentik.io/providers/oauth2/scope-offline_access",
284                ]
285            )
286        )
287        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
288        user = create_test_admin_user()
289        token: RefreshToken = RefreshToken.objects.create(
290            provider=provider,
291            user=user,
292            token=generate_id(),
293            _id_token=dumps({}),
294            auth_time=timezone.now(),
295            _scope="offline_access",
296        )
297        response = self.client.post(
298            reverse("authentik_providers_oauth2:token"),
299            data={
300                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
301                "refresh_token": token.token,
302                "redirect_uri": "http://local.invalid",
303            },
304            HTTP_AUTHORIZATION=f"Basic {header}",
305            HTTP_ORIGIN="http://another.invalid",
306        )
307        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
308        refresh: RefreshToken = RefreshToken.objects.filter(
309            user=user, provider=provider, revoked=False
310        ).first()
311        self.assertNotIn("Access-Control-Allow-Credentials", response)
312        self.assertNotIn("Access-Control-Allow-Origin", response)
313        self.assertJSONEqual(
314            response.content.decode(),
315            {
316                "access_token": access.token,
317                "refresh_token": refresh.token,
318                "token_type": TOKEN_TYPE,
319                "expires_in": 3600,
320                "id_token": provider.encode(
321                    access.id_token.to_dict(),
322                ),
323                "scope": "offline_access",
324            },
325        )

test request param

@apply_blueprint('system/providers-oauth2.yaml')
def test_refresh_token_revoke(self):
327    @apply_blueprint("system/providers-oauth2.yaml")
328    def test_refresh_token_revoke(self):
329        """test request param"""
330        provider = OAuth2Provider.objects.create(
331            name=generate_id(),
332            authorization_flow=create_test_flow(),
333            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
334            signing_key=self.keypair,
335        )
336        provider.property_mappings.set(
337            ScopeMapping.objects.filter(
338                managed__in=[
339                    "goauthentik.io/providers/oauth2/scope-openid",
340                    "goauthentik.io/providers/oauth2/scope-email",
341                    "goauthentik.io/providers/oauth2/scope-profile",
342                    "goauthentik.io/providers/oauth2/scope-offline_access",
343                ]
344            )
345        )
346        # Needs to be assigned to an application for iss to be set
347        self.app.provider = provider
348        self.app.save()
349        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
350        user = create_test_admin_user()
351        token: RefreshToken = RefreshToken.objects.create(
352            provider=provider,
353            user=user,
354            token=generate_id(),
355            _id_token=dumps({}),
356            auth_time=timezone.now(),
357            _scope="offline_access",
358        )
359        # Create initial refresh token
360        response = self.client.post(
361            reverse("authentik_providers_oauth2:token"),
362            data={
363                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
364                "refresh_token": token.token,
365                "redirect_uri": "http://testserver",
366            },
367            HTTP_AUTHORIZATION=f"Basic {header}",
368        )
369        new_token: RefreshToken = (
370            RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first()
371        )
372        # Post again with initial token -> get new refresh token
373        # and revoke old one
374        response = self.client.post(
375            reverse("authentik_providers_oauth2:token"),
376            data={
377                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
378                "refresh_token": new_token.token,
379                "redirect_uri": "http://local.invalid",
380            },
381            HTTP_AUTHORIZATION=f"Basic {header}",
382        )
383        self.assertEqual(response.status_code, 200)
384        # Post again with old token, is now revoked and should error
385        response = self.client.post(
386            reverse("authentik_providers_oauth2:token"),
387            data={
388                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
389                "refresh_token": new_token.token,
390                "redirect_uri": "http://local.invalid",
391            },
392            HTTP_AUTHORIZATION=f"Basic {header}",
393        )
394        self.assertEqual(response.status_code, 400)
395        self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists())

test request param

@apply_blueprint('system/providers-oauth2.yaml')
def test_refresh_token_view_threshold(self):
397    @apply_blueprint("system/providers-oauth2.yaml")
398    def test_refresh_token_view_threshold(self):
399        """test request param"""
400        provider = OAuth2Provider.objects.create(
401            name=generate_id(),
402            authorization_flow=create_test_flow(),
403            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
404            signing_key=self.keypair,
405            refresh_token_threshold="hours=1",  # nosec
406        )
407        provider.property_mappings.set(
408            ScopeMapping.objects.filter(
409                managed__in=[
410                    "goauthentik.io/providers/oauth2/scope-openid",
411                    "goauthentik.io/providers/oauth2/scope-email",
412                    "goauthentik.io/providers/oauth2/scope-profile",
413                    "goauthentik.io/providers/oauth2/scope-offline_access",
414                ]
415            )
416        )
417        # Needs to be assigned to an application for iss to be set
418        self.app.provider = provider
419        self.app.save()
420        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
421        user = create_test_admin_user()
422        token: RefreshToken = RefreshToken.objects.create(
423            provider=provider,
424            user=user,
425            token=generate_id(),
426            _id_token=dumps({}),
427            auth_time=timezone.now(),
428            _scope="offline_access",
429        )
430        response = self.client.post(
431            reverse("authentik_providers_oauth2:token"),
432            data={
433                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
434                "refresh_token": token.token,
435                "redirect_uri": "http://local.invalid",
436            },
437            HTTP_AUTHORIZATION=f"Basic {header}",
438            HTTP_ORIGIN="http://local.invalid",
439        )
440        self.assertEqual(response["Access-Control-Allow-Credentials"], "true")
441        self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid")
442        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
443        self.assertJSONEqual(
444            response.content.decode(),
445            {
446                "access_token": access.token,
447                "token_type": TOKEN_TYPE,
448                "expires_in": 3600,
449                "id_token": provider.encode(
450                    access.id_token.to_dict(),
451                ),
452                "scope": "offline_access",
453            },
454        )
455        self.validate_jwt(access, provider)

test request param

@apply_blueprint('system/providers-oauth2.yaml')
def test_scope_claim_override_via_property_mapping(self):
457    @apply_blueprint("system/providers-oauth2.yaml")
458    def test_scope_claim_override_via_property_mapping(self):
459        """Test that property mappings can override the scope claim in access tokens.
460
461        See: https://github.com/goauthentik/authentik/issues/19224
462        """
463        # Create a custom scope mapping that returns a custom scope claim
464        custom_scope_mapping = ScopeMapping.objects.create(
465            name="custom-scope-override",
466            scope_name="custom",
467            expression='return {"scope": "custom-scope-value additional-scope"}',
468        )
469
470        provider = OAuth2Provider.objects.create(
471            name=generate_id(),
472            authorization_flow=create_test_flow(),
473            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
474            signing_key=self.keypair,
475            include_claims_in_id_token=True,
476        )
477        provider.property_mappings.add(custom_scope_mapping)
478
479        # Needs to be assigned to an application for iss to be set
480        self.app.provider = provider
481        self.app.save()
482
483        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
484        user = create_test_admin_user()
485        code = AuthorizationCode.objects.create(
486            code="foobar",
487            provider=provider,
488            user=user,
489            auth_time=timezone.now(),
490            _scope="openid custom",  # Request the custom scope
491        )
492
493        response = self.client.post(
494            reverse("authentik_providers_oauth2:token"),
495            data={
496                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
497                "code": code.code,
498                "redirect_uri": "http://local.invalid",
499            },
500            HTTP_AUTHORIZATION=f"Basic {header}",
501        )
502        self.assertEqual(response.status_code, 200)
503
504        access: AccessToken = AccessToken.objects.filter(user=user, provider=provider).first()
505        jwt_data = self.validate_jwt(access, provider)
506
507        # The scope should be the custom value from the property mapping,
508        # not the default "openid custom"
509        self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")

Test that property mappings can override the scope claim in access tokens.

See: https://github.com/goauthentik/authentik/issues/19224