authentik.providers.oauth2.tests.test_token

Test token view

  1"""Test token view"""
  2
  3from base64 import b64encode
  4from datetime import timedelta
  5from json import dumps
  6from urllib.parse import quote
  7
  8from django.test import RequestFactory
  9from django.urls import reverse
 10from django.utils import timezone
 11from django.utils.timezone import now
 12from freezegun import freeze_time
 13
 14from authentik.blueprints.tests import apply_blueprint
 15from authentik.common.oauth.constants import (
 16    GRANT_TYPE_AUTHORIZATION_CODE,
 17    GRANT_TYPE_REFRESH_TOKEN,
 18    TOKEN_TYPE,
 19)
 20from authentik.core.models import Application
 21from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 22from authentik.events.models import Event, EventAction
 23from authentik.lib.generators import generate_id, generate_key
 24from authentik.providers.oauth2.errors import TokenError
 25from authentik.providers.oauth2.models import (
 26    AccessToken,
 27    AuthorizationCode,
 28    GrantType,
 29    OAuth2Provider,
 30    RedirectURI,
 31    RedirectURIMatchingMode,
 32    RefreshToken,
 33    ScopeMapping,
 34)
 35from authentik.providers.oauth2.tests.utils import OAuthTestCase
 36from authentik.providers.oauth2.utils import extract_client_auth
 37from authentik.providers.oauth2.views.token import TokenParams
 38
 39
 40class TestToken(OAuthTestCase):
 41    """Test token view"""
 42
 43    def setUp(self) -> None:
 44        super().setUp()
 45        self.factory = RequestFactory()
 46        self.app = Application.objects.create(name=generate_id(), slug="test")
 47
 48    def test_invalid_grant_type(self):
 49        """test request param"""
 50        provider = OAuth2Provider.objects.create(
 51            name=generate_id(),
 52            authorization_flow=create_test_flow(),
 53            grant_types=[],
 54            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")],
 55            signing_key=self.keypair,
 56        )
 57        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 58        user = create_test_admin_user()
 59        code = AuthorizationCode.objects.create(
 60            code="foobar", provider=provider, user=user, auth_time=timezone.now()
 61        )
 62        request = self.factory.post(
 63            "/",
 64            data={
 65                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 66                "code": code.code,
 67                "redirect_uri": "http://TestServer",
 68            },
 69            HTTP_AUTHORIZATION=f"Basic {header}",
 70        )
 71        with self.assertRaises(TokenError) as cm:
 72            TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
 73        self.assertEqual(cm.exception.cause, "grant_type_not_configured")
 74
 75    def test_request_auth_code(self):
 76        """test request param"""
 77        provider = OAuth2Provider.objects.create(
 78            name=generate_id(),
 79            authorization_flow=create_test_flow(),
 80            grant_types=[GrantType.AUTHORIZATION_CODE],
 81            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")],
 82            signing_key=self.keypair,
 83        )
 84        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 85        user = create_test_admin_user()
 86        code = AuthorizationCode.objects.create(
 87            code="foobar", provider=provider, user=user, auth_time=timezone.now()
 88        )
 89        request = self.factory.post(
 90            "/",
 91            data={
 92                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 93                "code": code.code,
 94                "redirect_uri": "http://TestServer",
 95            },
 96            HTTP_AUTHORIZATION=f"Basic {header}",
 97        )
 98        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
 99        self.assertEqual(params.provider, provider)
100        with self.assertRaises(TokenError):
101            TokenParams.parse(request, provider, provider.client_id, generate_key())
102
103    def test_request_auth_code_invalid(self):
104        """test request param"""
105        provider = OAuth2Provider.objects.create(
106            name=generate_id(),
107            authorization_flow=create_test_flow(),
108            grant_types=[GrantType.REFRESH_TOKEN],
109            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
110            signing_key=self.keypair,
111        )
112        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
113        request = self.factory.post(
114            "/",
115            data={
116                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
117                "code": "foo",
118                "redirect_uri": "http://testserver",
119            },
120            HTTP_AUTHORIZATION=f"Basic {header}",
121        )
122        with self.assertRaises(TokenError):
123            TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
124
125    def test_request_refresh_token(self):
126        """test request param"""
127        provider = OAuth2Provider.objects.create(
128            name=generate_id(),
129            authorization_flow=create_test_flow(),
130            grant_types=[GrantType.REFRESH_TOKEN],
131            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
132            signing_key=self.keypair,
133        )
134        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
135        user = create_test_admin_user()
136        token = RefreshToken.objects.create(
137            provider=provider,
138            user=user,
139            token=generate_id(),
140            auth_time=timezone.now(),
141        )
142        request = self.factory.post(
143            "/",
144            data={
145                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
146                "refresh_token": token.token,
147                "redirect_uri": "http://local.invalid",
148            },
149            HTTP_AUTHORIZATION=f"Basic {header}",
150        )
151        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
152        self.assertEqual(params.provider, provider)
153
154    def test_extract_client_auth_basic_auth_percent_decodes(self):
155        """test percent-decoding of client credentials in Basic auth"""
156        header = b64encode(
157            f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode()
158        ).decode()
159        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
160        self.assertEqual(extract_client_auth(request), ("client/id", "secret+/=="))
161
162    def test_extract_client_auth_basic_auth_preserves_raw_plus(self):
163        """test compatibility with clients that still send raw plus characters"""
164        header = b64encode(b"client:secret+plus").decode()
165        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
166        self.assertEqual(extract_client_auth(request), ("client", "secret+plus"))
167
168    def test_auth_code_view(self):
169        """test request param"""
170        provider = OAuth2Provider.objects.create(
171            name=generate_id(),
172            authorization_flow=create_test_flow(),
173            grant_types=[GrantType.AUTHORIZATION_CODE],
174            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
175            signing_key=self.keypair,
176        )
177        # Needs to be assigned to an application for iss to be set
178        self.app.provider = provider
179        self.app.save()
180        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
181        user = create_test_admin_user()
182        code = AuthorizationCode.objects.create(
183            code="foobar", provider=provider, user=user, auth_time=timezone.now()
184        )
185        response = self.client.post(
186            reverse("authentik_providers_oauth2:token"),
187            data={
188                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
189                "code": code.code,
190                "redirect_uri": "http://local.invalid",
191            },
192            HTTP_AUTHORIZATION=f"Basic {header}",
193        )
194        access = AccessToken.objects.filter(user=user, provider=provider).first()
195        self.assertJSONEqual(
196            response.content.decode(),
197            {
198                "access_token": access.token,
199                "token_type": TOKEN_TYPE,
200                "expires_in": 3600,
201                "id_token": provider.encode(
202                    access.id_token.to_dict(),
203                ),
204                "scope": "",
205            },
206        )
207        self.validate_jwt(access, provider)
208
209    def test_auth_code_enc(self):
210        """test request param"""
211        provider = OAuth2Provider.objects.create(
212            name=generate_id(),
213            authorization_flow=create_test_flow(),
214            grant_types=[GrantType.AUTHORIZATION_CODE],
215            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
216            signing_key=self.keypair,
217            encryption_key=self.keypair,
218        )
219        # Needs to be assigned to an application for iss to be set
220        self.app.provider = provider
221        self.app.save()
222        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
223        user = create_test_admin_user()
224        code = AuthorizationCode.objects.create(
225            code="foobar", provider=provider, user=user, auth_time=timezone.now()
226        )
227        response = self.client.post(
228            reverse("authentik_providers_oauth2:token"),
229            data={
230                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
231                "code": code.code,
232                "redirect_uri": "http://local.invalid",
233            },
234            HTTP_AUTHORIZATION=f"Basic {header}",
235        )
236        self.assertEqual(response.status_code, 200)
237        access = AccessToken.objects.filter(user=user, provider=provider).first()
238        self.validate_jwe(access, provider)
239
240    @apply_blueprint("system/providers-oauth2.yaml")
241    def test_refresh_token_view(self):
242        """test request param"""
243        provider = OAuth2Provider.objects.create(
244            name=generate_id(),
245            authorization_flow=create_test_flow(),
246            grant_types=[GrantType.REFRESH_TOKEN],
247            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
248            signing_key=self.keypair,
249        )
250        provider.property_mappings.set(
251            ScopeMapping.objects.filter(
252                managed__in=[
253                    "goauthentik.io/providers/oauth2/scope-openid",
254                    "goauthentik.io/providers/oauth2/scope-email",
255                    "goauthentik.io/providers/oauth2/scope-profile",
256                    "goauthentik.io/providers/oauth2/scope-offline_access",
257                ]
258            )
259        )
260        # Needs to be assigned to an application for iss to be set
261        self.app.provider = provider
262        self.app.save()
263        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
264        user = create_test_admin_user()
265        token = RefreshToken.objects.create(
266            provider=provider,
267            user=user,
268            token=generate_id(),
269            _id_token=dumps({}),
270            auth_time=timezone.now(),
271            _scope="offline_access",
272        )
273        response = self.client.post(
274            reverse("authentik_providers_oauth2:token"),
275            data={
276                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
277                "refresh_token": token.token,
278                "redirect_uri": "http://local.invalid",
279            },
280            HTTP_AUTHORIZATION=f"Basic {header}",
281            HTTP_ORIGIN="http://local.invalid",
282        )
283        self.assertEqual(response["Access-Control-Allow-Credentials"], "true")
284        self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid")
285        access = AccessToken.objects.filter(user=user, provider=provider).first()
286        refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first()
287        self.assertJSONEqual(
288            response.content.decode(),
289            {
290                "access_token": access.token,
291                "refresh_token": refresh.token,
292                "token_type": TOKEN_TYPE,
293                "expires_in": 3600,
294                "id_token": provider.encode(
295                    access.id_token.to_dict(),
296                ),
297                "scope": "offline_access",
298            },
299        )
300        self.validate_jwt(access, provider)
301
302    @apply_blueprint("system/providers-oauth2.yaml")
303    def test_refresh_token_view_invalid_origin(self):
304        """test request param"""
305        provider = OAuth2Provider.objects.create(
306            name=generate_id(),
307            authorization_flow=create_test_flow(),
308            grant_types=[GrantType.REFRESH_TOKEN],
309            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
310            signing_key=self.keypair,
311        )
312        provider.property_mappings.set(
313            ScopeMapping.objects.filter(
314                managed__in=[
315                    "goauthentik.io/providers/oauth2/scope-openid",
316                    "goauthentik.io/providers/oauth2/scope-email",
317                    "goauthentik.io/providers/oauth2/scope-profile",
318                    "goauthentik.io/providers/oauth2/scope-offline_access",
319                ]
320            )
321        )
322        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
323        user = create_test_admin_user()
324        token = RefreshToken.objects.create(
325            provider=provider,
326            user=user,
327            token=generate_id(),
328            _id_token=dumps({}),
329            auth_time=timezone.now(),
330            _scope="offline_access",
331        )
332        response = self.client.post(
333            reverse("authentik_providers_oauth2:token"),
334            data={
335                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
336                "refresh_token": token.token,
337                "redirect_uri": "http://local.invalid",
338            },
339            HTTP_AUTHORIZATION=f"Basic {header}",
340            HTTP_ORIGIN="http://another.invalid",
341        )
342        access = AccessToken.objects.filter(user=user, provider=provider).first()
343        refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first()
344        self.assertNotIn("Access-Control-Allow-Credentials", response)
345        self.assertNotIn("Access-Control-Allow-Origin", response)
346        self.assertJSONEqual(
347            response.content.decode(),
348            {
349                "access_token": access.token,
350                "refresh_token": refresh.token,
351                "token_type": TOKEN_TYPE,
352                "expires_in": 3600,
353                "id_token": provider.encode(
354                    access.id_token.to_dict(),
355                ),
356                "scope": "offline_access",
357            },
358        )
359
360    @apply_blueprint("system/providers-oauth2.yaml")
361    def test_refresh_token_revoke(self):
362        """test request param"""
363        provider = OAuth2Provider.objects.create(
364            name=generate_id(),
365            authorization_flow=create_test_flow(),
366            grant_types=[GrantType.REFRESH_TOKEN],
367            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
368            signing_key=self.keypair,
369        )
370        provider.property_mappings.set(
371            ScopeMapping.objects.filter(
372                managed__in=[
373                    "goauthentik.io/providers/oauth2/scope-openid",
374                    "goauthentik.io/providers/oauth2/scope-email",
375                    "goauthentik.io/providers/oauth2/scope-profile",
376                    "goauthentik.io/providers/oauth2/scope-offline_access",
377                ]
378            )
379        )
380        # Needs to be assigned to an application for iss to be set
381        self.app.provider = provider
382        self.app.save()
383        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
384        user = create_test_admin_user()
385        token = RefreshToken.objects.create(
386            provider=provider,
387            user=user,
388            token=generate_id(),
389            _id_token=dumps({}),
390            auth_time=timezone.now(),
391            _scope="offline_access",
392        )
393        # Create initial refresh token
394        response = self.client.post(
395            reverse("authentik_providers_oauth2:token"),
396            data={
397                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
398                "refresh_token": token.token,
399                "redirect_uri": "http://testserver",
400            },
401            HTTP_AUTHORIZATION=f"Basic {header}",
402        )
403        new_token = RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first()
404        # Post again with initial token -> get new refresh token
405        # and revoke old one
406        response = self.client.post(
407            reverse("authentik_providers_oauth2:token"),
408            data={
409                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
410                "refresh_token": new_token.token,
411                "redirect_uri": "http://local.invalid",
412            },
413            HTTP_AUTHORIZATION=f"Basic {header}",
414        )
415        self.assertEqual(response.status_code, 200)
416        # Post again with old token, is now revoked and should error
417        response = self.client.post(
418            reverse("authentik_providers_oauth2:token"),
419            data={
420                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
421                "refresh_token": new_token.token,
422                "redirect_uri": "http://local.invalid",
423            },
424            HTTP_AUTHORIZATION=f"Basic {header}",
425        )
426        self.assertEqual(response.status_code, 400)
427        self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists())
428
429    @apply_blueprint("system/providers-oauth2.yaml")
430    def test_refresh_token_view_threshold(self):
431        """refresh token threshold
432
433        threshold set to 1 hour, refresh token expires in 2 hours.
434        First request should not return a new refresh token, second request
435        has a fake time 1 hours in the future which should return a new access token"""
436        provider = OAuth2Provider.objects.create(
437            name=generate_id(),
438            authorization_flow=create_test_flow(),
439            grant_types=[GrantType.REFRESH_TOKEN],
440            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
441            signing_key=self.keypair,
442            refresh_token_threshold="hours=1",  # nosec
443        )
444        provider.property_mappings.set(
445            ScopeMapping.objects.filter(
446                managed__in=[
447                    "goauthentik.io/providers/oauth2/scope-openid",
448                    "goauthentik.io/providers/oauth2/scope-email",
449                    "goauthentik.io/providers/oauth2/scope-profile",
450                    "goauthentik.io/providers/oauth2/scope-offline_access",
451                ]
452            )
453        )
454        # Needs to be assigned to an application for iss to be set
455        self.app.provider = provider
456        self.app.save()
457        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
458        user = create_test_admin_user()
459        token = RefreshToken.objects.create(
460            provider=provider,
461            user=user,
462            token=generate_id(),
463            _id_token=dumps({}),
464            auth_time=timezone.now(),
465            _scope="offline_access",
466            expires=now() + timedelta(hours=2),
467        )
468        response = self.client.post(
469            reverse("authentik_providers_oauth2:token"),
470            data={
471                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
472                "refresh_token": token.token,
473                "redirect_uri": "http://local.invalid",
474            },
475            HTTP_AUTHORIZATION=f"Basic {header}",
476            HTTP_ORIGIN="http://local.invalid",
477        )
478        access = AccessToken.objects.filter(user=user, provider=provider).first()
479        self.assertJSONEqual(
480            response.content.decode(),
481            {
482                "access_token": access.token,
483                "token_type": TOKEN_TYPE,
484                "expires_in": 3600,
485                "id_token": provider.encode(
486                    access.id_token.to_dict(),
487                ),
488                "scope": "offline_access",
489            },
490        )
491        self.validate_jwt(access, provider)
492
493        with freeze_time(now() + timedelta(hours=1, minutes=10)):
494            response = self.client.post(
495                reverse("authentik_providers_oauth2:token"),
496                data={
497                    "grant_type": GRANT_TYPE_REFRESH_TOKEN,
498                    "refresh_token": token.token,
499                    "redirect_uri": "http://local.invalid",
500                },
501                HTTP_AUTHORIZATION=f"Basic {header}",
502                HTTP_ORIGIN="http://local.invalid",
503            )
504            access = AccessToken.objects.filter(user=user, provider=provider).first()
505            refresh = RefreshToken.objects.filter(user=user, provider=provider).last()
506            self.assertJSONEqual(
507                response.content.decode(),
508                {
509                    "access_token": access.token,
510                    "token_type": TOKEN_TYPE,
511                    "expires_in": 3600,
512                    "id_token": provider.encode(
513                        access.id_token.to_dict(),
514                    ),
515                    "scope": "offline_access",
516                    "refresh_token": refresh.token,
517                },
518            )
519            self.validate_jwt(access, provider)
520
521    @apply_blueprint("system/providers-oauth2.yaml")
522    def test_scope_claim_override_via_property_mapping(self):
523        """Test that property mappings can override the scope claim in access tokens.
524
525        See: https://github.com/goauthentik/authentik/issues/19224
526        """
527        # Create a custom scope mapping that returns a custom scope claim
528        custom_scope_mapping = ScopeMapping.objects.create(
529            name="custom-scope-override",
530            scope_name="custom",
531            expression='return {"scope": "custom-scope-value additional-scope"}',
532        )
533
534        provider = OAuth2Provider.objects.create(
535            name=generate_id(),
536            authorization_flow=create_test_flow(),
537            grant_types=[GrantType.AUTHORIZATION_CODE],
538            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
539            signing_key=self.keypair,
540            include_claims_in_id_token=True,
541        )
542        provider.property_mappings.add(custom_scope_mapping)
543
544        # Needs to be assigned to an application for iss to be set
545        self.app.provider = provider
546        self.app.save()
547
548        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
549        user = create_test_admin_user()
550        code = AuthorizationCode.objects.create(
551            code="foobar",
552            provider=provider,
553            user=user,
554            auth_time=timezone.now(),
555            _scope="openid custom",  # Request the custom scope
556        )
557
558        response = self.client.post(
559            reverse("authentik_providers_oauth2:token"),
560            data={
561                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
562                "code": code.code,
563                "redirect_uri": "http://local.invalid",
564            },
565            HTTP_AUTHORIZATION=f"Basic {header}",
566        )
567        self.assertEqual(response.status_code, 200)
568
569        access = AccessToken.objects.filter(user=user, provider=provider).first()
570        jwt_data = self.validate_jwt(access, provider)
571
572        # The scope should be the custom value from the property mapping,
573        # not the default "openid custom"
574        self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")
 41class TestToken(OAuthTestCase):
 42    """Test token view"""
 43
 44    def setUp(self) -> None:
 45        super().setUp()
 46        self.factory = RequestFactory()
 47        self.app = Application.objects.create(name=generate_id(), slug="test")
 48
 49    def test_invalid_grant_type(self):
 50        """test request param"""
 51        provider = OAuth2Provider.objects.create(
 52            name=generate_id(),
 53            authorization_flow=create_test_flow(),
 54            grant_types=[],
 55            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")],
 56            signing_key=self.keypair,
 57        )
 58        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 59        user = create_test_admin_user()
 60        code = AuthorizationCode.objects.create(
 61            code="foobar", provider=provider, user=user, auth_time=timezone.now()
 62        )
 63        request = self.factory.post(
 64            "/",
 65            data={
 66                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 67                "code": code.code,
 68                "redirect_uri": "http://TestServer",
 69            },
 70            HTTP_AUTHORIZATION=f"Basic {header}",
 71        )
 72        with self.assertRaises(TokenError) as cm:
 73            TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
 74        self.assertEqual(cm.exception.cause, "grant_type_not_configured")
 75
 76    def test_request_auth_code(self):
 77        """test request param"""
 78        provider = OAuth2Provider.objects.create(
 79            name=generate_id(),
 80            authorization_flow=create_test_flow(),
 81            grant_types=[GrantType.AUTHORIZATION_CODE],
 82            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")],
 83            signing_key=self.keypair,
 84        )
 85        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 86        user = create_test_admin_user()
 87        code = AuthorizationCode.objects.create(
 88            code="foobar", provider=provider, user=user, auth_time=timezone.now()
 89        )
 90        request = self.factory.post(
 91            "/",
 92            data={
 93                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 94                "code": code.code,
 95                "redirect_uri": "http://TestServer",
 96            },
 97            HTTP_AUTHORIZATION=f"Basic {header}",
 98        )
 99        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
100        self.assertEqual(params.provider, provider)
101        with self.assertRaises(TokenError):
102            TokenParams.parse(request, provider, provider.client_id, generate_key())
103
104    def test_request_auth_code_invalid(self):
105        """test request param"""
106        provider = OAuth2Provider.objects.create(
107            name=generate_id(),
108            authorization_flow=create_test_flow(),
109            grant_types=[GrantType.REFRESH_TOKEN],
110            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
111            signing_key=self.keypair,
112        )
113        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
114        request = self.factory.post(
115            "/",
116            data={
117                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
118                "code": "foo",
119                "redirect_uri": "http://testserver",
120            },
121            HTTP_AUTHORIZATION=f"Basic {header}",
122        )
123        with self.assertRaises(TokenError):
124            TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
125
126    def test_request_refresh_token(self):
127        """test request param"""
128        provider = OAuth2Provider.objects.create(
129            name=generate_id(),
130            authorization_flow=create_test_flow(),
131            grant_types=[GrantType.REFRESH_TOKEN],
132            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
133            signing_key=self.keypair,
134        )
135        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
136        user = create_test_admin_user()
137        token = RefreshToken.objects.create(
138            provider=provider,
139            user=user,
140            token=generate_id(),
141            auth_time=timezone.now(),
142        )
143        request = self.factory.post(
144            "/",
145            data={
146                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
147                "refresh_token": token.token,
148                "redirect_uri": "http://local.invalid",
149            },
150            HTTP_AUTHORIZATION=f"Basic {header}",
151        )
152        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
153        self.assertEqual(params.provider, provider)
154
155    def test_extract_client_auth_basic_auth_percent_decodes(self):
156        """test percent-decoding of client credentials in Basic auth"""
157        header = b64encode(
158            f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode()
159        ).decode()
160        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
161        self.assertEqual(extract_client_auth(request), ("client/id", "secret+/=="))
162
163    def test_extract_client_auth_basic_auth_preserves_raw_plus(self):
164        """test compatibility with clients that still send raw plus characters"""
165        header = b64encode(b"client:secret+plus").decode()
166        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
167        self.assertEqual(extract_client_auth(request), ("client", "secret+plus"))
168
169    def test_auth_code_view(self):
170        """test request param"""
171        provider = OAuth2Provider.objects.create(
172            name=generate_id(),
173            authorization_flow=create_test_flow(),
174            grant_types=[GrantType.AUTHORIZATION_CODE],
175            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
176            signing_key=self.keypair,
177        )
178        # Needs to be assigned to an application for iss to be set
179        self.app.provider = provider
180        self.app.save()
181        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
182        user = create_test_admin_user()
183        code = AuthorizationCode.objects.create(
184            code="foobar", provider=provider, user=user, auth_time=timezone.now()
185        )
186        response = self.client.post(
187            reverse("authentik_providers_oauth2:token"),
188            data={
189                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
190                "code": code.code,
191                "redirect_uri": "http://local.invalid",
192            },
193            HTTP_AUTHORIZATION=f"Basic {header}",
194        )
195        access = AccessToken.objects.filter(user=user, provider=provider).first()
196        self.assertJSONEqual(
197            response.content.decode(),
198            {
199                "access_token": access.token,
200                "token_type": TOKEN_TYPE,
201                "expires_in": 3600,
202                "id_token": provider.encode(
203                    access.id_token.to_dict(),
204                ),
205                "scope": "",
206            },
207        )
208        self.validate_jwt(access, provider)
209
210    def test_auth_code_enc(self):
211        """test request param"""
212        provider = OAuth2Provider.objects.create(
213            name=generate_id(),
214            authorization_flow=create_test_flow(),
215            grant_types=[GrantType.AUTHORIZATION_CODE],
216            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
217            signing_key=self.keypair,
218            encryption_key=self.keypair,
219        )
220        # Needs to be assigned to an application for iss to be set
221        self.app.provider = provider
222        self.app.save()
223        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
224        user = create_test_admin_user()
225        code = AuthorizationCode.objects.create(
226            code="foobar", provider=provider, user=user, auth_time=timezone.now()
227        )
228        response = self.client.post(
229            reverse("authentik_providers_oauth2:token"),
230            data={
231                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
232                "code": code.code,
233                "redirect_uri": "http://local.invalid",
234            },
235            HTTP_AUTHORIZATION=f"Basic {header}",
236        )
237        self.assertEqual(response.status_code, 200)
238        access = AccessToken.objects.filter(user=user, provider=provider).first()
239        self.validate_jwe(access, provider)
240
241    @apply_blueprint("system/providers-oauth2.yaml")
242    def test_refresh_token_view(self):
243        """test request param"""
244        provider = OAuth2Provider.objects.create(
245            name=generate_id(),
246            authorization_flow=create_test_flow(),
247            grant_types=[GrantType.REFRESH_TOKEN],
248            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
249            signing_key=self.keypair,
250        )
251        provider.property_mappings.set(
252            ScopeMapping.objects.filter(
253                managed__in=[
254                    "goauthentik.io/providers/oauth2/scope-openid",
255                    "goauthentik.io/providers/oauth2/scope-email",
256                    "goauthentik.io/providers/oauth2/scope-profile",
257                    "goauthentik.io/providers/oauth2/scope-offline_access",
258                ]
259            )
260        )
261        # Needs to be assigned to an application for iss to be set
262        self.app.provider = provider
263        self.app.save()
264        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
265        user = create_test_admin_user()
266        token = RefreshToken.objects.create(
267            provider=provider,
268            user=user,
269            token=generate_id(),
270            _id_token=dumps({}),
271            auth_time=timezone.now(),
272            _scope="offline_access",
273        )
274        response = self.client.post(
275            reverse("authentik_providers_oauth2:token"),
276            data={
277                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
278                "refresh_token": token.token,
279                "redirect_uri": "http://local.invalid",
280            },
281            HTTP_AUTHORIZATION=f"Basic {header}",
282            HTTP_ORIGIN="http://local.invalid",
283        )
284        self.assertEqual(response["Access-Control-Allow-Credentials"], "true")
285        self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid")
286        access = AccessToken.objects.filter(user=user, provider=provider).first()
287        refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first()
288        self.assertJSONEqual(
289            response.content.decode(),
290            {
291                "access_token": access.token,
292                "refresh_token": refresh.token,
293                "token_type": TOKEN_TYPE,
294                "expires_in": 3600,
295                "id_token": provider.encode(
296                    access.id_token.to_dict(),
297                ),
298                "scope": "offline_access",
299            },
300        )
301        self.validate_jwt(access, provider)
302
303    @apply_blueprint("system/providers-oauth2.yaml")
304    def test_refresh_token_view_invalid_origin(self):
305        """test request param"""
306        provider = OAuth2Provider.objects.create(
307            name=generate_id(),
308            authorization_flow=create_test_flow(),
309            grant_types=[GrantType.REFRESH_TOKEN],
310            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
311            signing_key=self.keypair,
312        )
313        provider.property_mappings.set(
314            ScopeMapping.objects.filter(
315                managed__in=[
316                    "goauthentik.io/providers/oauth2/scope-openid",
317                    "goauthentik.io/providers/oauth2/scope-email",
318                    "goauthentik.io/providers/oauth2/scope-profile",
319                    "goauthentik.io/providers/oauth2/scope-offline_access",
320                ]
321            )
322        )
323        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
324        user = create_test_admin_user()
325        token = RefreshToken.objects.create(
326            provider=provider,
327            user=user,
328            token=generate_id(),
329            _id_token=dumps({}),
330            auth_time=timezone.now(),
331            _scope="offline_access",
332        )
333        response = self.client.post(
334            reverse("authentik_providers_oauth2:token"),
335            data={
336                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
337                "refresh_token": token.token,
338                "redirect_uri": "http://local.invalid",
339            },
340            HTTP_AUTHORIZATION=f"Basic {header}",
341            HTTP_ORIGIN="http://another.invalid",
342        )
343        access = AccessToken.objects.filter(user=user, provider=provider).first()
344        refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first()
345        self.assertNotIn("Access-Control-Allow-Credentials", response)
346        self.assertNotIn("Access-Control-Allow-Origin", response)
347        self.assertJSONEqual(
348            response.content.decode(),
349            {
350                "access_token": access.token,
351                "refresh_token": refresh.token,
352                "token_type": TOKEN_TYPE,
353                "expires_in": 3600,
354                "id_token": provider.encode(
355                    access.id_token.to_dict(),
356                ),
357                "scope": "offline_access",
358            },
359        )
360
361    @apply_blueprint("system/providers-oauth2.yaml")
362    def test_refresh_token_revoke(self):
363        """test request param"""
364        provider = OAuth2Provider.objects.create(
365            name=generate_id(),
366            authorization_flow=create_test_flow(),
367            grant_types=[GrantType.REFRESH_TOKEN],
368            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
369            signing_key=self.keypair,
370        )
371        provider.property_mappings.set(
372            ScopeMapping.objects.filter(
373                managed__in=[
374                    "goauthentik.io/providers/oauth2/scope-openid",
375                    "goauthentik.io/providers/oauth2/scope-email",
376                    "goauthentik.io/providers/oauth2/scope-profile",
377                    "goauthentik.io/providers/oauth2/scope-offline_access",
378                ]
379            )
380        )
381        # Needs to be assigned to an application for iss to be set
382        self.app.provider = provider
383        self.app.save()
384        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
385        user = create_test_admin_user()
386        token = RefreshToken.objects.create(
387            provider=provider,
388            user=user,
389            token=generate_id(),
390            _id_token=dumps({}),
391            auth_time=timezone.now(),
392            _scope="offline_access",
393        )
394        # Create initial refresh token
395        response = self.client.post(
396            reverse("authentik_providers_oauth2:token"),
397            data={
398                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
399                "refresh_token": token.token,
400                "redirect_uri": "http://testserver",
401            },
402            HTTP_AUTHORIZATION=f"Basic {header}",
403        )
404        new_token = RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first()
405        # Post again with initial token -> get new refresh token
406        # and revoke old one
407        response = self.client.post(
408            reverse("authentik_providers_oauth2:token"),
409            data={
410                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
411                "refresh_token": new_token.token,
412                "redirect_uri": "http://local.invalid",
413            },
414            HTTP_AUTHORIZATION=f"Basic {header}",
415        )
416        self.assertEqual(response.status_code, 200)
417        # Post again with old token, is now revoked and should error
418        response = self.client.post(
419            reverse("authentik_providers_oauth2:token"),
420            data={
421                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
422                "refresh_token": new_token.token,
423                "redirect_uri": "http://local.invalid",
424            },
425            HTTP_AUTHORIZATION=f"Basic {header}",
426        )
427        self.assertEqual(response.status_code, 400)
428        self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists())
429
430    @apply_blueprint("system/providers-oauth2.yaml")
431    def test_refresh_token_view_threshold(self):
432        """refresh token threshold
433
434        threshold set to 1 hour, refresh token expires in 2 hours.
435        First request should not return a new refresh token, second request
436        has a fake time 1 hours in the future which should return a new access token"""
437        provider = OAuth2Provider.objects.create(
438            name=generate_id(),
439            authorization_flow=create_test_flow(),
440            grant_types=[GrantType.REFRESH_TOKEN],
441            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
442            signing_key=self.keypair,
443            refresh_token_threshold="hours=1",  # nosec
444        )
445        provider.property_mappings.set(
446            ScopeMapping.objects.filter(
447                managed__in=[
448                    "goauthentik.io/providers/oauth2/scope-openid",
449                    "goauthentik.io/providers/oauth2/scope-email",
450                    "goauthentik.io/providers/oauth2/scope-profile",
451                    "goauthentik.io/providers/oauth2/scope-offline_access",
452                ]
453            )
454        )
455        # Needs to be assigned to an application for iss to be set
456        self.app.provider = provider
457        self.app.save()
458        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
459        user = create_test_admin_user()
460        token = RefreshToken.objects.create(
461            provider=provider,
462            user=user,
463            token=generate_id(),
464            _id_token=dumps({}),
465            auth_time=timezone.now(),
466            _scope="offline_access",
467            expires=now() + timedelta(hours=2),
468        )
469        response = self.client.post(
470            reverse("authentik_providers_oauth2:token"),
471            data={
472                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
473                "refresh_token": token.token,
474                "redirect_uri": "http://local.invalid",
475            },
476            HTTP_AUTHORIZATION=f"Basic {header}",
477            HTTP_ORIGIN="http://local.invalid",
478        )
479        access = AccessToken.objects.filter(user=user, provider=provider).first()
480        self.assertJSONEqual(
481            response.content.decode(),
482            {
483                "access_token": access.token,
484                "token_type": TOKEN_TYPE,
485                "expires_in": 3600,
486                "id_token": provider.encode(
487                    access.id_token.to_dict(),
488                ),
489                "scope": "offline_access",
490            },
491        )
492        self.validate_jwt(access, provider)
493
494        with freeze_time(now() + timedelta(hours=1, minutes=10)):
495            response = self.client.post(
496                reverse("authentik_providers_oauth2:token"),
497                data={
498                    "grant_type": GRANT_TYPE_REFRESH_TOKEN,
499                    "refresh_token": token.token,
500                    "redirect_uri": "http://local.invalid",
501                },
502                HTTP_AUTHORIZATION=f"Basic {header}",
503                HTTP_ORIGIN="http://local.invalid",
504            )
505            access = AccessToken.objects.filter(user=user, provider=provider).first()
506            refresh = RefreshToken.objects.filter(user=user, provider=provider).last()
507            self.assertJSONEqual(
508                response.content.decode(),
509                {
510                    "access_token": access.token,
511                    "token_type": TOKEN_TYPE,
512                    "expires_in": 3600,
513                    "id_token": provider.encode(
514                        access.id_token.to_dict(),
515                    ),
516                    "scope": "offline_access",
517                    "refresh_token": refresh.token,
518                },
519            )
520            self.validate_jwt(access, provider)
521
522    @apply_blueprint("system/providers-oauth2.yaml")
523    def test_scope_claim_override_via_property_mapping(self):
524        """Test that property mappings can override the scope claim in access tokens.
525
526        See: https://github.com/goauthentik/authentik/issues/19224
527        """
528        # Create a custom scope mapping that returns a custom scope claim
529        custom_scope_mapping = ScopeMapping.objects.create(
530            name="custom-scope-override",
531            scope_name="custom",
532            expression='return {"scope": "custom-scope-value additional-scope"}',
533        )
534
535        provider = OAuth2Provider.objects.create(
536            name=generate_id(),
537            authorization_flow=create_test_flow(),
538            grant_types=[GrantType.AUTHORIZATION_CODE],
539            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
540            signing_key=self.keypair,
541            include_claims_in_id_token=True,
542        )
543        provider.property_mappings.add(custom_scope_mapping)
544
545        # Needs to be assigned to an application for iss to be set
546        self.app.provider = provider
547        self.app.save()
548
549        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
550        user = create_test_admin_user()
551        code = AuthorizationCode.objects.create(
552            code="foobar",
553            provider=provider,
554            user=user,
555            auth_time=timezone.now(),
556            _scope="openid custom",  # Request the custom scope
557        )
558
559        response = self.client.post(
560            reverse("authentik_providers_oauth2:token"),
561            data={
562                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
563                "code": code.code,
564                "redirect_uri": "http://local.invalid",
565            },
566            HTTP_AUTHORIZATION=f"Basic {header}",
567        )
568        self.assertEqual(response.status_code, 200)
569
570        access = AccessToken.objects.filter(user=user, provider=provider).first()
571        jwt_data = self.validate_jwt(access, provider)
572
573        # The scope should be the custom value from the property mapping,
574        # not the default "openid custom"
575        self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")

Test token view

def setUp(self) -> None:
44    def setUp(self) -> None:
45        super().setUp()
46        self.factory = RequestFactory()
47        self.app = Application.objects.create(name=generate_id(), slug="test")

Hook method for setting up the test fixture before exercising it.

def test_invalid_grant_type(self):
49    def test_invalid_grant_type(self):
50        """test request param"""
51        provider = OAuth2Provider.objects.create(
52            name=generate_id(),
53            authorization_flow=create_test_flow(),
54            grant_types=[],
55            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")],
56            signing_key=self.keypair,
57        )
58        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
59        user = create_test_admin_user()
60        code = AuthorizationCode.objects.create(
61            code="foobar", provider=provider, user=user, auth_time=timezone.now()
62        )
63        request = self.factory.post(
64            "/",
65            data={
66                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
67                "code": code.code,
68                "redirect_uri": "http://TestServer",
69            },
70            HTTP_AUTHORIZATION=f"Basic {header}",
71        )
72        with self.assertRaises(TokenError) as cm:
73            TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
74        self.assertEqual(cm.exception.cause, "grant_type_not_configured")

test request param

def test_request_auth_code(self):
 76    def test_request_auth_code(self):
 77        """test request param"""
 78        provider = OAuth2Provider.objects.create(
 79            name=generate_id(),
 80            authorization_flow=create_test_flow(),
 81            grant_types=[GrantType.AUTHORIZATION_CODE],
 82            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://TestServer")],
 83            signing_key=self.keypair,
 84        )
 85        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 86        user = create_test_admin_user()
 87        code = AuthorizationCode.objects.create(
 88            code="foobar", provider=provider, user=user, auth_time=timezone.now()
 89        )
 90        request = self.factory.post(
 91            "/",
 92            data={
 93                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 94                "code": code.code,
 95                "redirect_uri": "http://TestServer",
 96            },
 97            HTTP_AUTHORIZATION=f"Basic {header}",
 98        )
 99        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
100        self.assertEqual(params.provider, provider)
101        with self.assertRaises(TokenError):
102            TokenParams.parse(request, provider, provider.client_id, generate_key())

test request param

def test_request_auth_code_invalid(self):
104    def test_request_auth_code_invalid(self):
105        """test request param"""
106        provider = OAuth2Provider.objects.create(
107            name=generate_id(),
108            authorization_flow=create_test_flow(),
109            grant_types=[GrantType.REFRESH_TOKEN],
110            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
111            signing_key=self.keypair,
112        )
113        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
114        request = self.factory.post(
115            "/",
116            data={
117                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
118                "code": "foo",
119                "redirect_uri": "http://testserver",
120            },
121            HTTP_AUTHORIZATION=f"Basic {header}",
122        )
123        with self.assertRaises(TokenError):
124            TokenParams.parse(request, provider, provider.client_id, provider.client_secret)

test request param

def test_request_refresh_token(self):
126    def test_request_refresh_token(self):
127        """test request param"""
128        provider = OAuth2Provider.objects.create(
129            name=generate_id(),
130            authorization_flow=create_test_flow(),
131            grant_types=[GrantType.REFRESH_TOKEN],
132            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
133            signing_key=self.keypair,
134        )
135        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
136        user = create_test_admin_user()
137        token = RefreshToken.objects.create(
138            provider=provider,
139            user=user,
140            token=generate_id(),
141            auth_time=timezone.now(),
142        )
143        request = self.factory.post(
144            "/",
145            data={
146                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
147                "refresh_token": token.token,
148                "redirect_uri": "http://local.invalid",
149            },
150            HTTP_AUTHORIZATION=f"Basic {header}",
151        )
152        params = TokenParams.parse(request, provider, provider.client_id, provider.client_secret)
153        self.assertEqual(params.provider, provider)

test request param

def test_extract_client_auth_basic_auth_percent_decodes(self):
155    def test_extract_client_auth_basic_auth_percent_decodes(self):
156        """test percent-decoding of client credentials in Basic auth"""
157        header = b64encode(
158            f"{quote('client/id', safe='')}:{quote('secret+/==', safe='')}".encode()
159        ).decode()
160        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
161        self.assertEqual(extract_client_auth(request), ("client/id", "secret+/=="))

test percent-decoding of client credentials in Basic auth

def test_extract_client_auth_basic_auth_preserves_raw_plus(self):
163    def test_extract_client_auth_basic_auth_preserves_raw_plus(self):
164        """test compatibility with clients that still send raw plus characters"""
165        header = b64encode(b"client:secret+plus").decode()
166        request = self.factory.post("/", HTTP_AUTHORIZATION=f"Basic {header}")
167        self.assertEqual(extract_client_auth(request), ("client", "secret+plus"))

test compatibility with clients that still send raw plus characters

def test_auth_code_view(self):
169    def test_auth_code_view(self):
170        """test request param"""
171        provider = OAuth2Provider.objects.create(
172            name=generate_id(),
173            authorization_flow=create_test_flow(),
174            grant_types=[GrantType.AUTHORIZATION_CODE],
175            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
176            signing_key=self.keypair,
177        )
178        # Needs to be assigned to an application for iss to be set
179        self.app.provider = provider
180        self.app.save()
181        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
182        user = create_test_admin_user()
183        code = AuthorizationCode.objects.create(
184            code="foobar", provider=provider, user=user, auth_time=timezone.now()
185        )
186        response = self.client.post(
187            reverse("authentik_providers_oauth2:token"),
188            data={
189                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
190                "code": code.code,
191                "redirect_uri": "http://local.invalid",
192            },
193            HTTP_AUTHORIZATION=f"Basic {header}",
194        )
195        access = AccessToken.objects.filter(user=user, provider=provider).first()
196        self.assertJSONEqual(
197            response.content.decode(),
198            {
199                "access_token": access.token,
200                "token_type": TOKEN_TYPE,
201                "expires_in": 3600,
202                "id_token": provider.encode(
203                    access.id_token.to_dict(),
204                ),
205                "scope": "",
206            },
207        )
208        self.validate_jwt(access, provider)

test request param

def test_auth_code_enc(self):
210    def test_auth_code_enc(self):
211        """test request param"""
212        provider = OAuth2Provider.objects.create(
213            name=generate_id(),
214            authorization_flow=create_test_flow(),
215            grant_types=[GrantType.AUTHORIZATION_CODE],
216            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
217            signing_key=self.keypair,
218            encryption_key=self.keypair,
219        )
220        # Needs to be assigned to an application for iss to be set
221        self.app.provider = provider
222        self.app.save()
223        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
224        user = create_test_admin_user()
225        code = AuthorizationCode.objects.create(
226            code="foobar", provider=provider, user=user, auth_time=timezone.now()
227        )
228        response = self.client.post(
229            reverse("authentik_providers_oauth2:token"),
230            data={
231                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
232                "code": code.code,
233                "redirect_uri": "http://local.invalid",
234            },
235            HTTP_AUTHORIZATION=f"Basic {header}",
236        )
237        self.assertEqual(response.status_code, 200)
238        access = AccessToken.objects.filter(user=user, provider=provider).first()
239        self.validate_jwe(access, provider)

test request param

@apply_blueprint('system/providers-oauth2.yaml')
def test_refresh_token_view(self):
241    @apply_blueprint("system/providers-oauth2.yaml")
242    def test_refresh_token_view(self):
243        """test request param"""
244        provider = OAuth2Provider.objects.create(
245            name=generate_id(),
246            authorization_flow=create_test_flow(),
247            grant_types=[GrantType.REFRESH_TOKEN],
248            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
249            signing_key=self.keypair,
250        )
251        provider.property_mappings.set(
252            ScopeMapping.objects.filter(
253                managed__in=[
254                    "goauthentik.io/providers/oauth2/scope-openid",
255                    "goauthentik.io/providers/oauth2/scope-email",
256                    "goauthentik.io/providers/oauth2/scope-profile",
257                    "goauthentik.io/providers/oauth2/scope-offline_access",
258                ]
259            )
260        )
261        # Needs to be assigned to an application for iss to be set
262        self.app.provider = provider
263        self.app.save()
264        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
265        user = create_test_admin_user()
266        token = RefreshToken.objects.create(
267            provider=provider,
268            user=user,
269            token=generate_id(),
270            _id_token=dumps({}),
271            auth_time=timezone.now(),
272            _scope="offline_access",
273        )
274        response = self.client.post(
275            reverse("authentik_providers_oauth2:token"),
276            data={
277                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
278                "refresh_token": token.token,
279                "redirect_uri": "http://local.invalid",
280            },
281            HTTP_AUTHORIZATION=f"Basic {header}",
282            HTTP_ORIGIN="http://local.invalid",
283        )
284        self.assertEqual(response["Access-Control-Allow-Credentials"], "true")
285        self.assertEqual(response["Access-Control-Allow-Origin"], "http://local.invalid")
286        access = AccessToken.objects.filter(user=user, provider=provider).first()
287        refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first()
288        self.assertJSONEqual(
289            response.content.decode(),
290            {
291                "access_token": access.token,
292                "refresh_token": refresh.token,
293                "token_type": TOKEN_TYPE,
294                "expires_in": 3600,
295                "id_token": provider.encode(
296                    access.id_token.to_dict(),
297                ),
298                "scope": "offline_access",
299            },
300        )
301        self.validate_jwt(access, provider)

test request param

@apply_blueprint('system/providers-oauth2.yaml')
def test_refresh_token_view_invalid_origin(self):
303    @apply_blueprint("system/providers-oauth2.yaml")
304    def test_refresh_token_view_invalid_origin(self):
305        """test request param"""
306        provider = OAuth2Provider.objects.create(
307            name=generate_id(),
308            authorization_flow=create_test_flow(),
309            grant_types=[GrantType.REFRESH_TOKEN],
310            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
311            signing_key=self.keypair,
312        )
313        provider.property_mappings.set(
314            ScopeMapping.objects.filter(
315                managed__in=[
316                    "goauthentik.io/providers/oauth2/scope-openid",
317                    "goauthentik.io/providers/oauth2/scope-email",
318                    "goauthentik.io/providers/oauth2/scope-profile",
319                    "goauthentik.io/providers/oauth2/scope-offline_access",
320                ]
321            )
322        )
323        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
324        user = create_test_admin_user()
325        token = RefreshToken.objects.create(
326            provider=provider,
327            user=user,
328            token=generate_id(),
329            _id_token=dumps({}),
330            auth_time=timezone.now(),
331            _scope="offline_access",
332        )
333        response = self.client.post(
334            reverse("authentik_providers_oauth2:token"),
335            data={
336                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
337                "refresh_token": token.token,
338                "redirect_uri": "http://local.invalid",
339            },
340            HTTP_AUTHORIZATION=f"Basic {header}",
341            HTTP_ORIGIN="http://another.invalid",
342        )
343        access = AccessToken.objects.filter(user=user, provider=provider).first()
344        refresh = RefreshToken.objects.filter(user=user, provider=provider, revoked=False).first()
345        self.assertNotIn("Access-Control-Allow-Credentials", response)
346        self.assertNotIn("Access-Control-Allow-Origin", response)
347        self.assertJSONEqual(
348            response.content.decode(),
349            {
350                "access_token": access.token,
351                "refresh_token": refresh.token,
352                "token_type": TOKEN_TYPE,
353                "expires_in": 3600,
354                "id_token": provider.encode(
355                    access.id_token.to_dict(),
356                ),
357                "scope": "offline_access",
358            },
359        )

test request param

@apply_blueprint('system/providers-oauth2.yaml')
def test_refresh_token_revoke(self):
361    @apply_blueprint("system/providers-oauth2.yaml")
362    def test_refresh_token_revoke(self):
363        """test request param"""
364        provider = OAuth2Provider.objects.create(
365            name=generate_id(),
366            authorization_flow=create_test_flow(),
367            grant_types=[GrantType.REFRESH_TOKEN],
368            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver")],
369            signing_key=self.keypair,
370        )
371        provider.property_mappings.set(
372            ScopeMapping.objects.filter(
373                managed__in=[
374                    "goauthentik.io/providers/oauth2/scope-openid",
375                    "goauthentik.io/providers/oauth2/scope-email",
376                    "goauthentik.io/providers/oauth2/scope-profile",
377                    "goauthentik.io/providers/oauth2/scope-offline_access",
378                ]
379            )
380        )
381        # Needs to be assigned to an application for iss to be set
382        self.app.provider = provider
383        self.app.save()
384        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
385        user = create_test_admin_user()
386        token = RefreshToken.objects.create(
387            provider=provider,
388            user=user,
389            token=generate_id(),
390            _id_token=dumps({}),
391            auth_time=timezone.now(),
392            _scope="offline_access",
393        )
394        # Create initial refresh token
395        response = self.client.post(
396            reverse("authentik_providers_oauth2:token"),
397            data={
398                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
399                "refresh_token": token.token,
400                "redirect_uri": "http://testserver",
401            },
402            HTTP_AUTHORIZATION=f"Basic {header}",
403        )
404        new_token = RefreshToken.objects.filter(user=user).exclude(pk=token.pk).first()
405        # Post again with initial token -> get new refresh token
406        # and revoke old one
407        response = self.client.post(
408            reverse("authentik_providers_oauth2:token"),
409            data={
410                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
411                "refresh_token": new_token.token,
412                "redirect_uri": "http://local.invalid",
413            },
414            HTTP_AUTHORIZATION=f"Basic {header}",
415        )
416        self.assertEqual(response.status_code, 200)
417        # Post again with old token, is now revoked and should error
418        response = self.client.post(
419            reverse("authentik_providers_oauth2:token"),
420            data={
421                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
422                "refresh_token": new_token.token,
423                "redirect_uri": "http://local.invalid",
424            },
425            HTTP_AUTHORIZATION=f"Basic {header}",
426        )
427        self.assertEqual(response.status_code, 400)
428        self.assertTrue(Event.objects.filter(action=EventAction.SUSPICIOUS_REQUEST).exists())

test request param

@apply_blueprint('system/providers-oauth2.yaml')
def test_refresh_token_view_threshold(self):
430    @apply_blueprint("system/providers-oauth2.yaml")
431    def test_refresh_token_view_threshold(self):
432        """refresh token threshold
433
434        threshold set to 1 hour, refresh token expires in 2 hours.
435        First request should not return a new refresh token, second request
436        has a fake time 1 hours in the future which should return a new access token"""
437        provider = OAuth2Provider.objects.create(
438            name=generate_id(),
439            authorization_flow=create_test_flow(),
440            grant_types=[GrantType.REFRESH_TOKEN],
441            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
442            signing_key=self.keypair,
443            refresh_token_threshold="hours=1",  # nosec
444        )
445        provider.property_mappings.set(
446            ScopeMapping.objects.filter(
447                managed__in=[
448                    "goauthentik.io/providers/oauth2/scope-openid",
449                    "goauthentik.io/providers/oauth2/scope-email",
450                    "goauthentik.io/providers/oauth2/scope-profile",
451                    "goauthentik.io/providers/oauth2/scope-offline_access",
452                ]
453            )
454        )
455        # Needs to be assigned to an application for iss to be set
456        self.app.provider = provider
457        self.app.save()
458        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
459        user = create_test_admin_user()
460        token = RefreshToken.objects.create(
461            provider=provider,
462            user=user,
463            token=generate_id(),
464            _id_token=dumps({}),
465            auth_time=timezone.now(),
466            _scope="offline_access",
467            expires=now() + timedelta(hours=2),
468        )
469        response = self.client.post(
470            reverse("authentik_providers_oauth2:token"),
471            data={
472                "grant_type": GRANT_TYPE_REFRESH_TOKEN,
473                "refresh_token": token.token,
474                "redirect_uri": "http://local.invalid",
475            },
476            HTTP_AUTHORIZATION=f"Basic {header}",
477            HTTP_ORIGIN="http://local.invalid",
478        )
479        access = AccessToken.objects.filter(user=user, provider=provider).first()
480        self.assertJSONEqual(
481            response.content.decode(),
482            {
483                "access_token": access.token,
484                "token_type": TOKEN_TYPE,
485                "expires_in": 3600,
486                "id_token": provider.encode(
487                    access.id_token.to_dict(),
488                ),
489                "scope": "offline_access",
490            },
491        )
492        self.validate_jwt(access, provider)
493
494        with freeze_time(now() + timedelta(hours=1, minutes=10)):
495            response = self.client.post(
496                reverse("authentik_providers_oauth2:token"),
497                data={
498                    "grant_type": GRANT_TYPE_REFRESH_TOKEN,
499                    "refresh_token": token.token,
500                    "redirect_uri": "http://local.invalid",
501                },
502                HTTP_AUTHORIZATION=f"Basic {header}",
503                HTTP_ORIGIN="http://local.invalid",
504            )
505            access = AccessToken.objects.filter(user=user, provider=provider).first()
506            refresh = RefreshToken.objects.filter(user=user, provider=provider).last()
507            self.assertJSONEqual(
508                response.content.decode(),
509                {
510                    "access_token": access.token,
511                    "token_type": TOKEN_TYPE,
512                    "expires_in": 3600,
513                    "id_token": provider.encode(
514                        access.id_token.to_dict(),
515                    ),
516                    "scope": "offline_access",
517                    "refresh_token": refresh.token,
518                },
519            )
520            self.validate_jwt(access, provider)

refresh token threshold

threshold set to 1 hour, refresh token expires in 2 hours. First request should not return a new refresh token, second request has a fake time 1 hours in the future which should return a new access token

@apply_blueprint('system/providers-oauth2.yaml')
def test_scope_claim_override_via_property_mapping(self):
522    @apply_blueprint("system/providers-oauth2.yaml")
523    def test_scope_claim_override_via_property_mapping(self):
524        """Test that property mappings can override the scope claim in access tokens.
525
526        See: https://github.com/goauthentik/authentik/issues/19224
527        """
528        # Create a custom scope mapping that returns a custom scope claim
529        custom_scope_mapping = ScopeMapping.objects.create(
530            name="custom-scope-override",
531            scope_name="custom",
532            expression='return {"scope": "custom-scope-value additional-scope"}',
533        )
534
535        provider = OAuth2Provider.objects.create(
536            name=generate_id(),
537            authorization_flow=create_test_flow(),
538            grant_types=[GrantType.AUTHORIZATION_CODE],
539            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
540            signing_key=self.keypair,
541            include_claims_in_id_token=True,
542        )
543        provider.property_mappings.add(custom_scope_mapping)
544
545        # Needs to be assigned to an application for iss to be set
546        self.app.provider = provider
547        self.app.save()
548
549        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
550        user = create_test_admin_user()
551        code = AuthorizationCode.objects.create(
552            code="foobar",
553            provider=provider,
554            user=user,
555            auth_time=timezone.now(),
556            _scope="openid custom",  # Request the custom scope
557        )
558
559        response = self.client.post(
560            reverse("authentik_providers_oauth2:token"),
561            data={
562                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
563                "code": code.code,
564                "redirect_uri": "http://local.invalid",
565            },
566            HTTP_AUTHORIZATION=f"Basic {header}",
567        )
568        self.assertEqual(response.status_code, 200)
569
570        access = AccessToken.objects.filter(user=user, provider=provider).first()
571        jwt_data = self.validate_jwt(access, provider)
572
573        # The scope should be the custom value from the property mapping,
574        # not the default "openid custom"
575        self.assertEqual(jwt_data["scope"], "custom-scope-value additional-scope")

Test that property mappings can override the scope claim in access tokens.

See: https://github.com/goauthentik/authentik/issues/19224