authentik.providers.oauth2.tests.test_authorize

Test authorize view

  1"""Test authorize view"""
  2
  3from unittest.mock import MagicMock, patch
  4from urllib.parse import parse_qs, urlparse
  5
  6from django.test import RequestFactory
  7from django.urls import reverse
  8from django.utils import translation
  9from django.utils.timezone import now
 10
 11from authentik.blueprints.tests import apply_blueprint
 12from authentik.common.oauth.constants import SCOPE_OFFLINE_ACCESS, SCOPE_OPENID, TOKEN_TYPE
 13from authentik.core.models import Application
 14from authentik.core.tests.utils import create_test_admin_user, create_test_brand, create_test_flow
 15from authentik.events.models import Event, EventAction
 16from authentik.flows.models import FlowStageBinding
 17from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER
 18from authentik.flows.views.executor import SESSION_KEY_PLAN
 19from authentik.lib.generators import generate_id
 20from authentik.lib.utils.time import timedelta_from_string
 21from authentik.providers.oauth2.errors import AuthorizeError, ClientIdError, RedirectUriError
 22from authentik.providers.oauth2.models import (
 23    AccessToken,
 24    AuthorizationCode,
 25    GrantTypes,
 26    OAuth2Provider,
 27    RedirectURI,
 28    RedirectURIMatchingMode,
 29    ScopeMapping,
 30)
 31from authentik.providers.oauth2.tests.utils import OAuthTestCase
 32from authentik.providers.oauth2.views.authorize import OAuthAuthorizationParams
 33from authentik.stages.dummy.models import DummyStage
 34from authentik.stages.password.stage import PLAN_CONTEXT_METHOD
 35
 36
 37class TestAuthorize(OAuthTestCase):
 38    """Test authorize view"""
 39
 40    def setUp(self) -> None:
 41        super().setUp()
 42        self.factory = RequestFactory()
 43
 44    def test_invalid_grant_type(self):
 45        """Test with invalid grant type"""
 46        OAuth2Provider.objects.create(
 47            name=generate_id(),
 48            client_id="test",
 49            authorization_flow=create_test_flow(),
 50            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
 51        )
 52        with self.assertRaises(AuthorizeError) as cm:
 53            request = self.factory.get(
 54                "/",
 55                data={
 56                    "response_type": "invalid",
 57                    "client_id": "test",
 58                    "redirect_uri": "http://local.invalid/Foo",
 59                },
 60            )
 61            OAuthAuthorizationParams.from_request(request)
 62        self.assertEqual(cm.exception.error, "unsupported_response_type")
 63
 64    def test_invalid_client_id(self):
 65        """Test invalid client ID"""
 66        with self.assertRaises(ClientIdError):
 67            request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"})
 68            OAuthAuthorizationParams.from_request(request)
 69
 70    def test_request(self):
 71        """test request param"""
 72        OAuth2Provider.objects.create(
 73            name=generate_id(),
 74            client_id="test",
 75            authorization_flow=create_test_flow(),
 76            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
 77        )
 78        with self.assertRaises(AuthorizeError) as cm:
 79            request = self.factory.get(
 80                "/",
 81                data={
 82                    "response_type": "code",
 83                    "client_id": "test",
 84                    "redirect_uri": "http://local.invalid/Foo",
 85                    "request": "foo",
 86                },
 87            )
 88            OAuthAuthorizationParams.from_request(request)
 89        self.assertEqual(cm.exception.error, "request_not_supported")
 90
 91    def test_invalid_redirect_uri_missing(self):
 92        """test missing redirect URI"""
 93        OAuth2Provider.objects.create(
 94            name=generate_id(),
 95            client_id="test",
 96            authorization_flow=create_test_flow(),
 97            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
 98        )
 99        with self.assertRaises(RedirectUriError) as cm:
100            request = self.factory.get("/", data={"response_type": "code", "client_id": "test"})
101            OAuthAuthorizationParams.from_request(request)
102        self.assertEqual(cm.exception.cause, "redirect_uri_missing")
103
104    def test_invalid_redirect_uri(self):
105        """test invalid redirect URI"""
106        OAuth2Provider.objects.create(
107            name=generate_id(),
108            client_id="test",
109            authorization_flow=create_test_flow(),
110            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
111        )
112        with self.assertRaises(RedirectUriError) as cm:
113            request = self.factory.get(
114                "/",
115                data={
116                    "response_type": "code",
117                    "client_id": "test",
118                    "redirect_uri": "http://localhost",
119                },
120            )
121            OAuthAuthorizationParams.from_request(request)
122        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
123
124    def test_blocked_redirect_uri(self):
125        """test missing/invalid redirect URI"""
126        OAuth2Provider.objects.create(
127            name=generate_id(),
128            client_id="test",
129            authorization_flow=create_test_flow(),
130            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")],
131        )
132        with self.assertRaises(RedirectUriError) as cm:
133            request = self.factory.get(
134                "/",
135                data={
136                    "response_type": "code",
137                    "client_id": "test",
138                    "redirect_uri": "data:localhost",
139                },
140            )
141            OAuthAuthorizationParams.from_request(request)
142        self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme")
143
144    def test_invalid_redirect_uri_empty(self):
145        """test missing/invalid redirect URI"""
146        provider = OAuth2Provider.objects.create(
147            name=generate_id(),
148            client_id="test",
149            authorization_flow=create_test_flow(),
150            redirect_uris=[],
151        )
152        request = self.factory.get(
153            "/",
154            data={
155                "response_type": "code",
156                "client_id": "test",
157                "redirect_uri": "+",
158            },
159        )
160        OAuthAuthorizationParams.from_request(request)
161        provider.refresh_from_db()
162        self.assertEqual(provider.redirect_uris, [RedirectURI(RedirectURIMatchingMode.STRICT, "+")])
163
164    def test_invalid_redirect_uri_regex(self):
165        """test missing/invalid redirect URI"""
166        OAuth2Provider.objects.create(
167            name=generate_id(),
168            client_id="test",
169            authorization_flow=create_test_flow(),
170            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")],
171        )
172        with self.assertRaises(RedirectUriError) as cm:
173            request = self.factory.get(
174                "/",
175                data={
176                    "response_type": "code",
177                    "client_id": "test",
178                    "redirect_uri": "http://localhost",
179                },
180            )
181            OAuthAuthorizationParams.from_request(request)
182        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
183
184    def test_redirect_uri_invalid_regex(self):
185        """test missing/invalid redirect URI (invalid regex)"""
186        OAuth2Provider.objects.create(
187            name=generate_id(),
188            client_id="test",
189            authorization_flow=create_test_flow(),
190            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")],
191        )
192        with self.assertRaises(RedirectUriError) as cm:
193            request = self.factory.get(
194                "/",
195                data={
196                    "response_type": "code",
197                    "client_id": "test",
198                    "redirect_uri": "http://localhost",
199                },
200            )
201            OAuthAuthorizationParams.from_request(request)
202        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
203
204    def test_redirect_uri_regex(self):
205        """test valid redirect URI (regex)"""
206        OAuth2Provider.objects.create(
207            name=generate_id(),
208            client_id="test",
209            authorization_flow=create_test_flow(),
210            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")],
211        )
212        request = self.factory.get(
213            "/",
214            data={
215                "response_type": "code",
216                "client_id": "test",
217                "redirect_uri": "http://foo.bar.baz",
218            },
219        )
220        OAuthAuthorizationParams.from_request(request)
221
222    @apply_blueprint("system/providers-oauth2.yaml")
223    def test_response_type(self):
224        """test response_type"""
225        provider = OAuth2Provider.objects.create(
226            name=generate_id(),
227            client_id="test",
228            authorization_flow=create_test_flow(),
229            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
230        )
231        provider.property_mappings.set(
232            ScopeMapping.objects.filter(
233                managed__in=[
234                    "goauthentik.io/providers/oauth2/scope-openid",
235                    "goauthentik.io/providers/oauth2/scope-email",
236                    "goauthentik.io/providers/oauth2/scope-profile",
237                ]
238            )
239        )
240        request = self.factory.get(
241            "/",
242            data={
243                "response_type": "code",
244                "client_id": "test",
245                "redirect_uri": "http://local.invalid/Foo",
246            },
247        )
248        self.assertEqual(
249            OAuthAuthorizationParams.from_request(request).grant_type,
250            GrantTypes.AUTHORIZATION_CODE,
251        )
252        self.assertEqual(
253            OAuthAuthorizationParams.from_request(request).redirect_uri,
254            "http://local.invalid/Foo",
255        )
256        request = self.factory.get(
257            "/",
258            data={
259                "response_type": "id_token",
260                "client_id": "test",
261                "redirect_uri": "http://local.invalid/Foo",
262                "scope": "openid",
263                "state": "foo",
264                "nonce": generate_id(),
265            },
266        )
267        self.assertEqual(
268            OAuthAuthorizationParams.from_request(request).grant_type,
269            GrantTypes.IMPLICIT,
270        )
271        # Implicit without openid scope
272        with self.assertRaises(AuthorizeError) as cm:
273            request = self.factory.get(
274                "/",
275                data={
276                    "response_type": "id_token",
277                    "client_id": "test",
278                    "redirect_uri": "http://local.invalid/Foo",
279                    "state": "foo",
280                },
281            )
282            self.assertEqual(
283                OAuthAuthorizationParams.from_request(request).grant_type,
284                GrantTypes.IMPLICIT,
285            )
286        request = self.factory.get(
287            "/",
288            data={
289                "response_type": "code token",
290                "client_id": "test",
291                "redirect_uri": "http://local.invalid/Foo",
292                "scope": "openid",
293                "state": "foo",
294            },
295        )
296        self.assertEqual(
297            OAuthAuthorizationParams.from_request(request).grant_type, GrantTypes.HYBRID
298        )
299        with self.assertRaises(AuthorizeError) as cm:
300            request = self.factory.get(
301                "/",
302                data={
303                    "response_type": "invalid",
304                    "client_id": "test",
305                    "redirect_uri": "http://local.invalid/Foo",
306                },
307            )
308            OAuthAuthorizationParams.from_request(request)
309        self.assertEqual(cm.exception.error, "unsupported_response_type")
310
311    def test_full_code(self):
312        """Test full authorization"""
313        flow = create_test_flow()
314        provider = OAuth2Provider.objects.create(
315            name=generate_id(),
316            client_id="test",
317            authorization_flow=flow,
318            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
319            access_code_validity="seconds=100",
320        )
321        Application.objects.create(name="app", slug="app", provider=provider)
322        state = generate_id()
323        user = create_test_admin_user()
324        self.client.force_login(user)
325        # Step 1, initiate params and get redirect to flow
326        response = self.client.get(
327            reverse("authentik_providers_oauth2:authorize"),
328            data={
329                "response_type": "code",
330                "client_id": "test",
331                "state": state,
332                "redirect_uri": "foo://localhost",
333            },
334        )
335        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
336        self.assertEqual(
337            response.url,
338            f"foo://localhost?code={code.code}&state={state}",
339        )
340        self.assertAlmostEqual(
341            code.expires.timestamp() - now().timestamp(),
342            timedelta_from_string(provider.access_code_validity).total_seconds(),
343            delta=5,
344        )
345
346    @apply_blueprint("system/providers-oauth2.yaml")
347    def test_full_implicit(self):
348        """Test full authorization"""
349        flow = create_test_flow()
350        provider: OAuth2Provider = OAuth2Provider.objects.create(
351            name=generate_id(),
352            client_id="test",
353            authorization_flow=flow,
354            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
355            signing_key=self.keypair,
356        )
357        provider.property_mappings.set(
358            ScopeMapping.objects.filter(
359                managed__in=[
360                    "goauthentik.io/providers/oauth2/scope-openid",
361                    "goauthentik.io/providers/oauth2/scope-email",
362                    "goauthentik.io/providers/oauth2/scope-profile",
363                ]
364            )
365        )
366        provider.property_mappings.add(
367            ScopeMapping.objects.create(
368                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
369            )
370        )
371        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
372        state = generate_id()
373        user = create_test_admin_user()
374        self.client.force_login(user)
375        with patch(
376            "authentik.providers.oauth2.id_token.get_login_event",
377            MagicMock(
378                return_value=Event(
379                    action=EventAction.LOGIN,
380                    context={PLAN_CONTEXT_METHOD: "password"},
381                    created=now(),
382                )
383            ),
384        ):
385            # Step 1, initiate params and get redirect to flow
386            response = self.client.get(
387                reverse("authentik_providers_oauth2:authorize"),
388                data={
389                    "response_type": "id_token",
390                    "client_id": "test",
391                    "state": state,
392                    "scope": "openid test",
393                    "redirect_uri": "http://localhost",
394                    "nonce": generate_id(),
395                },
396            )
397            token: AccessToken = AccessToken.objects.filter(user=user).first()
398            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
399            self.assertEqual(
400                response.url,
401                (
402                    f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}"
403                    f"&token_type={TOKEN_TYPE}"
404                    f"&expires_in={int(expires)}&state={state}"
405                ),
406            )
407            jwt = self.validate_jwt(token, provider)
408            self.assertEqual(jwt["amr"], ["pwd"])
409            self.assertEqual(jwt["sub"], "foo")
410            self.assertAlmostEqual(
411                jwt["exp"] - now().timestamp(),
412                expires,
413                delta=5,
414            )
415
416    @apply_blueprint("system/providers-oauth2.yaml")
417    def test_full_implicit_enc(self):
418        """Test full authorization with encryption"""
419        flow = create_test_flow()
420        provider: OAuth2Provider = OAuth2Provider.objects.create(
421            name=generate_id(),
422            client_id="test",
423            authorization_flow=flow,
424            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
425            signing_key=self.keypair,
426            encryption_key=self.keypair,
427        )
428        provider.property_mappings.set(
429            ScopeMapping.objects.filter(
430                managed__in=[
431                    "goauthentik.io/providers/oauth2/scope-openid",
432                    "goauthentik.io/providers/oauth2/scope-email",
433                    "goauthentik.io/providers/oauth2/scope-profile",
434                ]
435            )
436        )
437        provider.property_mappings.add(
438            ScopeMapping.objects.create(
439                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
440            )
441        )
442        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
443        state = generate_id()
444        user = create_test_admin_user()
445        self.client.force_login(user)
446        with patch(
447            "authentik.providers.oauth2.id_token.get_login_event",
448            MagicMock(
449                return_value=Event(
450                    action=EventAction.LOGIN,
451                    context={PLAN_CONTEXT_METHOD: "password"},
452                    created=now(),
453                )
454            ),
455        ):
456            # Step 1, initiate params and get redirect to flow
457            response = self.client.get(
458                reverse("authentik_providers_oauth2:authorize"),
459                data={
460                    "response_type": "id_token",
461                    "client_id": "test",
462                    "state": state,
463                    "scope": "openid test",
464                    "redirect_uri": "http://localhost",
465                    "nonce": generate_id(),
466                },
467            )
468            self.assertEqual(response.status_code, 302)
469            token: AccessToken = AccessToken.objects.filter(user=user).first()
470            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
471            jwt = self.validate_jwe(token, provider)
472            self.assertEqual(jwt["amr"], ["pwd"])
473            self.assertEqual(jwt["sub"], "foo")
474            self.assertAlmostEqual(
475                jwt["exp"] - now().timestamp(),
476                expires,
477                delta=5,
478            )
479
480    def test_full_fragment_code(self):
481        """Test full authorization"""
482        flow = create_test_flow()
483        provider: OAuth2Provider = OAuth2Provider.objects.create(
484            name=generate_id(),
485            client_id="test",
486            authorization_flow=flow,
487            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
488            signing_key=self.keypair,
489        )
490        Application.objects.create(name="app", slug="app", provider=provider)
491        state = generate_id()
492        user = create_test_admin_user()
493        self.client.force_login(user)
494        with patch(
495            "authentik.providers.oauth2.id_token.get_login_event",
496            MagicMock(
497                return_value=Event(
498                    action=EventAction.LOGIN,
499                    context={PLAN_CONTEXT_METHOD: "password"},
500                    created=now(),
501                )
502            ),
503        ):
504            # Step 1, initiate params and get redirect to flow
505            response = self.client.get(
506                reverse("authentik_providers_oauth2:authorize"),
507                data={
508                    "response_type": "code",
509                    "response_mode": "fragment",
510                    "client_id": "test",
511                    "state": state,
512                    "scope": "openid",
513                    "redirect_uri": "http://localhost",
514                    "nonce": generate_id(),
515                },
516            )
517            code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
518            self.assertEqual(
519                response.url,
520                f"http://localhost#code={code.code}&state={state}",
521            )
522            self.assertAlmostEqual(
523                code.expires.timestamp() - now().timestamp(),
524                timedelta_from_string(provider.access_code_validity).total_seconds(),
525                delta=5,
526            )
527
528    @apply_blueprint("system/providers-oauth2.yaml")
529    def test_full_form_post_id_token(self):
530        """Test full authorization (form_post response)"""
531        flow = create_test_flow()
532        provider = OAuth2Provider.objects.create(
533            name=generate_id(),
534            client_id=generate_id(),
535            authorization_flow=flow,
536            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
537            signing_key=self.keypair,
538        )
539        provider.property_mappings.set(
540            ScopeMapping.objects.filter(
541                managed__in=[
542                    "goauthentik.io/providers/oauth2/scope-openid",
543                    "goauthentik.io/providers/oauth2/scope-email",
544                    "goauthentik.io/providers/oauth2/scope-profile",
545                ]
546            )
547        )
548        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
549        state = generate_id()
550        user = create_test_admin_user()
551        self.client.force_login(user)
552        # Step 1, initiate params and get redirect to flow
553        self.client.get(
554            reverse("authentik_providers_oauth2:authorize"),
555            data={
556                "response_type": "id_token",
557                "response_mode": "form_post",
558                "client_id": provider.client_id,
559                "state": state,
560                "scope": "openid",
561                "redirect_uri": "http://localhost",
562                "nonce": generate_id(),
563            },
564        )
565        response = self.client.get(
566            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
567        )
568        token: AccessToken = AccessToken.objects.filter(user=user).first()
569        self.assertIsNotNone(token)
570        self.assertJSONEqual(
571            response.content.decode(),
572            {
573                "component": "ak-stage-autosubmit",
574                "url": "http://localhost",
575                "title": f"Redirecting to {app.name}...",
576                "attrs": {
577                    "id_token": provider.encode(token.id_token.to_dict()),
578                    "token_type": TOKEN_TYPE,
579                    "expires_in": "3600",
580                    "state": state,
581                },
582            },
583        )
584        self.validate_jwt(token, provider)
585
586    def test_full_form_post_code(self):
587        """Test full authorization (form_post response, code type)"""
588        flow = create_test_flow()
589        provider = OAuth2Provider.objects.create(
590            name=generate_id(),
591            client_id=generate_id(),
592            authorization_flow=flow,
593            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
594            signing_key=self.keypair,
595        )
596        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
597        state = generate_id()
598        user = create_test_admin_user()
599        self.client.force_login(user)
600        # Step 1, initiate params and get redirect to flow
601        self.client.get(
602            reverse("authentik_providers_oauth2:authorize"),
603            data={
604                "response_type": "code",
605                "response_mode": "form_post",
606                "client_id": provider.client_id,
607                "state": state,
608                "scope": "openid",
609                "redirect_uri": "http://localhost",
610            },
611        )
612        response = self.client.get(
613            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
614        )
615        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
616        self.assertJSONEqual(
617            response.content.decode(),
618            {
619                "component": "ak-stage-autosubmit",
620                "url": "http://localhost",
621                "title": f"Redirecting to {app.name}...",
622                "attrs": {
623                    "code": code.code,
624                    "state": state,
625                },
626            },
627        )
628
629    def test_openid_missing_invalid(self):
630        """test request requiring an OpenID scope to be set"""
631        OAuth2Provider.objects.create(
632            name=generate_id(),
633            client_id="test",
634            authorization_flow=create_test_flow(),
635            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
636        )
637        request = self.factory.get(
638            "/",
639            data={
640                "response_type": "id_token",
641                "client_id": "test",
642                "redirect_uri": "http://localhost",
643                "scope": "",
644            },
645        )
646        with self.assertRaises(AuthorizeError) as cm:
647            OAuthAuthorizationParams.from_request(request)
648        self.assertEqual(cm.exception.cause, "scope_openid_missing")
649
650    @apply_blueprint("system/providers-oauth2.yaml")
651    def test_offline_access_invalid(self):
652        """test request for offline_access with invalid response type"""
653        provider = OAuth2Provider.objects.create(
654            name=generate_id(),
655            client_id="test",
656            authorization_flow=create_test_flow(),
657            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
658        )
659        provider.property_mappings.set(
660            ScopeMapping.objects.filter(
661                managed__in=[
662                    "goauthentik.io/providers/oauth2/scope-openid",
663                    "goauthentik.io/providers/oauth2/scope-offline_access",
664                ]
665            )
666        )
667        request = self.factory.get(
668            "/",
669            data={
670                "response_type": "id_token",
671                "client_id": "test",
672                "redirect_uri": "http://localhost",
673                "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}",
674                "nonce": generate_id(),
675            },
676        )
677        parsed = OAuthAuthorizationParams.from_request(request)
678        self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope)
679
680    @apply_blueprint("default/flow-default-authentication-flow.yaml")
681    def test_ui_locales(self):
682        """Test OIDC ui_locales authorization"""
683        flow = create_test_flow()
684        provider = OAuth2Provider.objects.create(
685            name=generate_id(),
686            client_id="test",
687            authorization_flow=flow,
688            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
689            access_code_validity="seconds=100",
690        )
691        Application.objects.create(name="app", slug="app", provider=provider)
692        state = generate_id()
693        self.client.logout()
694        try:
695            response = self.client.get(
696                reverse("authentik_providers_oauth2:authorize"),
697                data={
698                    "response_type": "code",
699                    "client_id": "test",
700                    "state": state,
701                    "redirect_uri": "foo://localhost",
702                    "ui_locales": "invalid fr",
703                },
704            )
705            parsed = parse_qs(urlparse(response.url).query)
706            self.assertEqual(parsed["locale"], ["fr"])
707        finally:
708            translation.deactivate()
709
710    @apply_blueprint("default/flow-default-authentication-flow.yaml")
711    def test_ui_locales_invalid(self):
712        """Test OIDC ui_locales authorization"""
713        flow = create_test_flow()
714        provider = OAuth2Provider.objects.create(
715            name=generate_id(),
716            client_id="test",
717            authorization_flow=flow,
718            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
719            access_code_validity="seconds=100",
720        )
721        Application.objects.create(name="app", slug="app", provider=provider)
722        state = generate_id()
723        self.client.logout()
724        response = self.client.get(
725            reverse("authentik_providers_oauth2:authorize"),
726            data={
727                "response_type": "code",
728                "client_id": "test",
729                "state": state,
730                "redirect_uri": "foo://localhost",
731                "ui_locales": "invalid",
732            },
733        )
734        parsed = parse_qs(urlparse(response.url).query)
735        self.assertNotIn("locale", parsed)
736
737    def test_authentication_flow(self):
738        """Test custom authentication flow"""
739        brand = create_test_brand()
740        global_auth = create_test_flow()
741        FlowStageBinding.objects.create(
742            target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10
743        )
744        brand.flow_authentication = global_auth
745        brand.save()
746
747        flow = create_test_flow()
748        auth_flow = create_test_flow()
749        FlowStageBinding.objects.create(
750            target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10
751        )
752        provider = OAuth2Provider.objects.create(
753            name=generate_id(),
754            client_id="test",
755            authorization_flow=flow,
756            authentication_flow=auth_flow,
757            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
758            access_code_validity="seconds=100",
759        )
760        Application.objects.create(name="app", slug="app", provider=provider)
761        state = generate_id()
762        response = self.client.get(
763            reverse("authentik_providers_oauth2:authorize"),
764            data={
765                "response_type": "code",
766                "client_id": "test",
767                "state": state,
768                "redirect_uri": "foo://localhost",
769            },
770        )
771        self.assertEqual(response.status_code, 302)
772        self.assertIn(auth_flow.slug, response.url)
773        self.assertNotIn(global_auth.slug, response.url)
774
775    @apply_blueprint("default/flow-default-authentication-flow.yaml")
776    def test_login_hint(self):
777        """Login hint"""
778        flow = create_test_flow()
779        provider = OAuth2Provider.objects.create(
780            name=generate_id(),
781            client_id="test",
782            authorization_flow=flow,
783            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
784            access_code_validity="seconds=100",
785        )
786        Application.objects.create(name="app", slug="app", provider=provider)
787        state = generate_id()
788        response = self.client.get(
789            reverse("authentik_providers_oauth2:authorize"),
790            data={
791                "response_type": "code",
792                "client_id": "test",
793                "state": state,
794                "redirect_uri": "foo://localhost",
795                "login_hint": "foo",
796            },
797        )
798        self.assertEqual(response.status_code, 302)
799        plan = self.client.session.get(SESSION_KEY_PLAN)
800        self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")
class TestAuthorize(authentik.providers.oauth2.tests.utils.OAuthTestCase):
 38class TestAuthorize(OAuthTestCase):
 39    """Test authorize view"""
 40
 41    def setUp(self) -> None:
 42        super().setUp()
 43        self.factory = RequestFactory()
 44
 45    def test_invalid_grant_type(self):
 46        """Test with invalid grant type"""
 47        OAuth2Provider.objects.create(
 48            name=generate_id(),
 49            client_id="test",
 50            authorization_flow=create_test_flow(),
 51            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
 52        )
 53        with self.assertRaises(AuthorizeError) as cm:
 54            request = self.factory.get(
 55                "/",
 56                data={
 57                    "response_type": "invalid",
 58                    "client_id": "test",
 59                    "redirect_uri": "http://local.invalid/Foo",
 60                },
 61            )
 62            OAuthAuthorizationParams.from_request(request)
 63        self.assertEqual(cm.exception.error, "unsupported_response_type")
 64
 65    def test_invalid_client_id(self):
 66        """Test invalid client ID"""
 67        with self.assertRaises(ClientIdError):
 68            request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"})
 69            OAuthAuthorizationParams.from_request(request)
 70
 71    def test_request(self):
 72        """test request param"""
 73        OAuth2Provider.objects.create(
 74            name=generate_id(),
 75            client_id="test",
 76            authorization_flow=create_test_flow(),
 77            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
 78        )
 79        with self.assertRaises(AuthorizeError) as cm:
 80            request = self.factory.get(
 81                "/",
 82                data={
 83                    "response_type": "code",
 84                    "client_id": "test",
 85                    "redirect_uri": "http://local.invalid/Foo",
 86                    "request": "foo",
 87                },
 88            )
 89            OAuthAuthorizationParams.from_request(request)
 90        self.assertEqual(cm.exception.error, "request_not_supported")
 91
 92    def test_invalid_redirect_uri_missing(self):
 93        """test missing redirect URI"""
 94        OAuth2Provider.objects.create(
 95            name=generate_id(),
 96            client_id="test",
 97            authorization_flow=create_test_flow(),
 98            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
 99        )
100        with self.assertRaises(RedirectUriError) as cm:
101            request = self.factory.get("/", data={"response_type": "code", "client_id": "test"})
102            OAuthAuthorizationParams.from_request(request)
103        self.assertEqual(cm.exception.cause, "redirect_uri_missing")
104
105    def test_invalid_redirect_uri(self):
106        """test invalid redirect URI"""
107        OAuth2Provider.objects.create(
108            name=generate_id(),
109            client_id="test",
110            authorization_flow=create_test_flow(),
111            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
112        )
113        with self.assertRaises(RedirectUriError) as cm:
114            request = self.factory.get(
115                "/",
116                data={
117                    "response_type": "code",
118                    "client_id": "test",
119                    "redirect_uri": "http://localhost",
120                },
121            )
122            OAuthAuthorizationParams.from_request(request)
123        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
124
125    def test_blocked_redirect_uri(self):
126        """test missing/invalid redirect URI"""
127        OAuth2Provider.objects.create(
128            name=generate_id(),
129            client_id="test",
130            authorization_flow=create_test_flow(),
131            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")],
132        )
133        with self.assertRaises(RedirectUriError) as cm:
134            request = self.factory.get(
135                "/",
136                data={
137                    "response_type": "code",
138                    "client_id": "test",
139                    "redirect_uri": "data:localhost",
140                },
141            )
142            OAuthAuthorizationParams.from_request(request)
143        self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme")
144
145    def test_invalid_redirect_uri_empty(self):
146        """test missing/invalid redirect URI"""
147        provider = OAuth2Provider.objects.create(
148            name=generate_id(),
149            client_id="test",
150            authorization_flow=create_test_flow(),
151            redirect_uris=[],
152        )
153        request = self.factory.get(
154            "/",
155            data={
156                "response_type": "code",
157                "client_id": "test",
158                "redirect_uri": "+",
159            },
160        )
161        OAuthAuthorizationParams.from_request(request)
162        provider.refresh_from_db()
163        self.assertEqual(provider.redirect_uris, [RedirectURI(RedirectURIMatchingMode.STRICT, "+")])
164
165    def test_invalid_redirect_uri_regex(self):
166        """test missing/invalid redirect URI"""
167        OAuth2Provider.objects.create(
168            name=generate_id(),
169            client_id="test",
170            authorization_flow=create_test_flow(),
171            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")],
172        )
173        with self.assertRaises(RedirectUriError) as cm:
174            request = self.factory.get(
175                "/",
176                data={
177                    "response_type": "code",
178                    "client_id": "test",
179                    "redirect_uri": "http://localhost",
180                },
181            )
182            OAuthAuthorizationParams.from_request(request)
183        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
184
185    def test_redirect_uri_invalid_regex(self):
186        """test missing/invalid redirect URI (invalid regex)"""
187        OAuth2Provider.objects.create(
188            name=generate_id(),
189            client_id="test",
190            authorization_flow=create_test_flow(),
191            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")],
192        )
193        with self.assertRaises(RedirectUriError) as cm:
194            request = self.factory.get(
195                "/",
196                data={
197                    "response_type": "code",
198                    "client_id": "test",
199                    "redirect_uri": "http://localhost",
200                },
201            )
202            OAuthAuthorizationParams.from_request(request)
203        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
204
205    def test_redirect_uri_regex(self):
206        """test valid redirect URI (regex)"""
207        OAuth2Provider.objects.create(
208            name=generate_id(),
209            client_id="test",
210            authorization_flow=create_test_flow(),
211            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")],
212        )
213        request = self.factory.get(
214            "/",
215            data={
216                "response_type": "code",
217                "client_id": "test",
218                "redirect_uri": "http://foo.bar.baz",
219            },
220        )
221        OAuthAuthorizationParams.from_request(request)
222
223    @apply_blueprint("system/providers-oauth2.yaml")
224    def test_response_type(self):
225        """test response_type"""
226        provider = OAuth2Provider.objects.create(
227            name=generate_id(),
228            client_id="test",
229            authorization_flow=create_test_flow(),
230            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
231        )
232        provider.property_mappings.set(
233            ScopeMapping.objects.filter(
234                managed__in=[
235                    "goauthentik.io/providers/oauth2/scope-openid",
236                    "goauthentik.io/providers/oauth2/scope-email",
237                    "goauthentik.io/providers/oauth2/scope-profile",
238                ]
239            )
240        )
241        request = self.factory.get(
242            "/",
243            data={
244                "response_type": "code",
245                "client_id": "test",
246                "redirect_uri": "http://local.invalid/Foo",
247            },
248        )
249        self.assertEqual(
250            OAuthAuthorizationParams.from_request(request).grant_type,
251            GrantTypes.AUTHORIZATION_CODE,
252        )
253        self.assertEqual(
254            OAuthAuthorizationParams.from_request(request).redirect_uri,
255            "http://local.invalid/Foo",
256        )
257        request = self.factory.get(
258            "/",
259            data={
260                "response_type": "id_token",
261                "client_id": "test",
262                "redirect_uri": "http://local.invalid/Foo",
263                "scope": "openid",
264                "state": "foo",
265                "nonce": generate_id(),
266            },
267        )
268        self.assertEqual(
269            OAuthAuthorizationParams.from_request(request).grant_type,
270            GrantTypes.IMPLICIT,
271        )
272        # Implicit without openid scope
273        with self.assertRaises(AuthorizeError) as cm:
274            request = self.factory.get(
275                "/",
276                data={
277                    "response_type": "id_token",
278                    "client_id": "test",
279                    "redirect_uri": "http://local.invalid/Foo",
280                    "state": "foo",
281                },
282            )
283            self.assertEqual(
284                OAuthAuthorizationParams.from_request(request).grant_type,
285                GrantTypes.IMPLICIT,
286            )
287        request = self.factory.get(
288            "/",
289            data={
290                "response_type": "code token",
291                "client_id": "test",
292                "redirect_uri": "http://local.invalid/Foo",
293                "scope": "openid",
294                "state": "foo",
295            },
296        )
297        self.assertEqual(
298            OAuthAuthorizationParams.from_request(request).grant_type, GrantTypes.HYBRID
299        )
300        with self.assertRaises(AuthorizeError) as cm:
301            request = self.factory.get(
302                "/",
303                data={
304                    "response_type": "invalid",
305                    "client_id": "test",
306                    "redirect_uri": "http://local.invalid/Foo",
307                },
308            )
309            OAuthAuthorizationParams.from_request(request)
310        self.assertEqual(cm.exception.error, "unsupported_response_type")
311
312    def test_full_code(self):
313        """Test full authorization"""
314        flow = create_test_flow()
315        provider = OAuth2Provider.objects.create(
316            name=generate_id(),
317            client_id="test",
318            authorization_flow=flow,
319            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
320            access_code_validity="seconds=100",
321        )
322        Application.objects.create(name="app", slug="app", provider=provider)
323        state = generate_id()
324        user = create_test_admin_user()
325        self.client.force_login(user)
326        # Step 1, initiate params and get redirect to flow
327        response = self.client.get(
328            reverse("authentik_providers_oauth2:authorize"),
329            data={
330                "response_type": "code",
331                "client_id": "test",
332                "state": state,
333                "redirect_uri": "foo://localhost",
334            },
335        )
336        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
337        self.assertEqual(
338            response.url,
339            f"foo://localhost?code={code.code}&state={state}",
340        )
341        self.assertAlmostEqual(
342            code.expires.timestamp() - now().timestamp(),
343            timedelta_from_string(provider.access_code_validity).total_seconds(),
344            delta=5,
345        )
346
347    @apply_blueprint("system/providers-oauth2.yaml")
348    def test_full_implicit(self):
349        """Test full authorization"""
350        flow = create_test_flow()
351        provider: OAuth2Provider = OAuth2Provider.objects.create(
352            name=generate_id(),
353            client_id="test",
354            authorization_flow=flow,
355            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
356            signing_key=self.keypair,
357        )
358        provider.property_mappings.set(
359            ScopeMapping.objects.filter(
360                managed__in=[
361                    "goauthentik.io/providers/oauth2/scope-openid",
362                    "goauthentik.io/providers/oauth2/scope-email",
363                    "goauthentik.io/providers/oauth2/scope-profile",
364                ]
365            )
366        )
367        provider.property_mappings.add(
368            ScopeMapping.objects.create(
369                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
370            )
371        )
372        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
373        state = generate_id()
374        user = create_test_admin_user()
375        self.client.force_login(user)
376        with patch(
377            "authentik.providers.oauth2.id_token.get_login_event",
378            MagicMock(
379                return_value=Event(
380                    action=EventAction.LOGIN,
381                    context={PLAN_CONTEXT_METHOD: "password"},
382                    created=now(),
383                )
384            ),
385        ):
386            # Step 1, initiate params and get redirect to flow
387            response = self.client.get(
388                reverse("authentik_providers_oauth2:authorize"),
389                data={
390                    "response_type": "id_token",
391                    "client_id": "test",
392                    "state": state,
393                    "scope": "openid test",
394                    "redirect_uri": "http://localhost",
395                    "nonce": generate_id(),
396                },
397            )
398            token: AccessToken = AccessToken.objects.filter(user=user).first()
399            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
400            self.assertEqual(
401                response.url,
402                (
403                    f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}"
404                    f"&token_type={TOKEN_TYPE}"
405                    f"&expires_in={int(expires)}&state={state}"
406                ),
407            )
408            jwt = self.validate_jwt(token, provider)
409            self.assertEqual(jwt["amr"], ["pwd"])
410            self.assertEqual(jwt["sub"], "foo")
411            self.assertAlmostEqual(
412                jwt["exp"] - now().timestamp(),
413                expires,
414                delta=5,
415            )
416
417    @apply_blueprint("system/providers-oauth2.yaml")
418    def test_full_implicit_enc(self):
419        """Test full authorization with encryption"""
420        flow = create_test_flow()
421        provider: OAuth2Provider = OAuth2Provider.objects.create(
422            name=generate_id(),
423            client_id="test",
424            authorization_flow=flow,
425            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
426            signing_key=self.keypair,
427            encryption_key=self.keypair,
428        )
429        provider.property_mappings.set(
430            ScopeMapping.objects.filter(
431                managed__in=[
432                    "goauthentik.io/providers/oauth2/scope-openid",
433                    "goauthentik.io/providers/oauth2/scope-email",
434                    "goauthentik.io/providers/oauth2/scope-profile",
435                ]
436            )
437        )
438        provider.property_mappings.add(
439            ScopeMapping.objects.create(
440                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
441            )
442        )
443        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
444        state = generate_id()
445        user = create_test_admin_user()
446        self.client.force_login(user)
447        with patch(
448            "authentik.providers.oauth2.id_token.get_login_event",
449            MagicMock(
450                return_value=Event(
451                    action=EventAction.LOGIN,
452                    context={PLAN_CONTEXT_METHOD: "password"},
453                    created=now(),
454                )
455            ),
456        ):
457            # Step 1, initiate params and get redirect to flow
458            response = self.client.get(
459                reverse("authentik_providers_oauth2:authorize"),
460                data={
461                    "response_type": "id_token",
462                    "client_id": "test",
463                    "state": state,
464                    "scope": "openid test",
465                    "redirect_uri": "http://localhost",
466                    "nonce": generate_id(),
467                },
468            )
469            self.assertEqual(response.status_code, 302)
470            token: AccessToken = AccessToken.objects.filter(user=user).first()
471            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
472            jwt = self.validate_jwe(token, provider)
473            self.assertEqual(jwt["amr"], ["pwd"])
474            self.assertEqual(jwt["sub"], "foo")
475            self.assertAlmostEqual(
476                jwt["exp"] - now().timestamp(),
477                expires,
478                delta=5,
479            )
480
481    def test_full_fragment_code(self):
482        """Test full authorization"""
483        flow = create_test_flow()
484        provider: OAuth2Provider = OAuth2Provider.objects.create(
485            name=generate_id(),
486            client_id="test",
487            authorization_flow=flow,
488            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
489            signing_key=self.keypair,
490        )
491        Application.objects.create(name="app", slug="app", provider=provider)
492        state = generate_id()
493        user = create_test_admin_user()
494        self.client.force_login(user)
495        with patch(
496            "authentik.providers.oauth2.id_token.get_login_event",
497            MagicMock(
498                return_value=Event(
499                    action=EventAction.LOGIN,
500                    context={PLAN_CONTEXT_METHOD: "password"},
501                    created=now(),
502                )
503            ),
504        ):
505            # Step 1, initiate params and get redirect to flow
506            response = self.client.get(
507                reverse("authentik_providers_oauth2:authorize"),
508                data={
509                    "response_type": "code",
510                    "response_mode": "fragment",
511                    "client_id": "test",
512                    "state": state,
513                    "scope": "openid",
514                    "redirect_uri": "http://localhost",
515                    "nonce": generate_id(),
516                },
517            )
518            code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
519            self.assertEqual(
520                response.url,
521                f"http://localhost#code={code.code}&state={state}",
522            )
523            self.assertAlmostEqual(
524                code.expires.timestamp() - now().timestamp(),
525                timedelta_from_string(provider.access_code_validity).total_seconds(),
526                delta=5,
527            )
528
529    @apply_blueprint("system/providers-oauth2.yaml")
530    def test_full_form_post_id_token(self):
531        """Test full authorization (form_post response)"""
532        flow = create_test_flow()
533        provider = OAuth2Provider.objects.create(
534            name=generate_id(),
535            client_id=generate_id(),
536            authorization_flow=flow,
537            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
538            signing_key=self.keypair,
539        )
540        provider.property_mappings.set(
541            ScopeMapping.objects.filter(
542                managed__in=[
543                    "goauthentik.io/providers/oauth2/scope-openid",
544                    "goauthentik.io/providers/oauth2/scope-email",
545                    "goauthentik.io/providers/oauth2/scope-profile",
546                ]
547            )
548        )
549        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
550        state = generate_id()
551        user = create_test_admin_user()
552        self.client.force_login(user)
553        # Step 1, initiate params and get redirect to flow
554        self.client.get(
555            reverse("authentik_providers_oauth2:authorize"),
556            data={
557                "response_type": "id_token",
558                "response_mode": "form_post",
559                "client_id": provider.client_id,
560                "state": state,
561                "scope": "openid",
562                "redirect_uri": "http://localhost",
563                "nonce": generate_id(),
564            },
565        )
566        response = self.client.get(
567            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
568        )
569        token: AccessToken = AccessToken.objects.filter(user=user).first()
570        self.assertIsNotNone(token)
571        self.assertJSONEqual(
572            response.content.decode(),
573            {
574                "component": "ak-stage-autosubmit",
575                "url": "http://localhost",
576                "title": f"Redirecting to {app.name}...",
577                "attrs": {
578                    "id_token": provider.encode(token.id_token.to_dict()),
579                    "token_type": TOKEN_TYPE,
580                    "expires_in": "3600",
581                    "state": state,
582                },
583            },
584        )
585        self.validate_jwt(token, provider)
586
587    def test_full_form_post_code(self):
588        """Test full authorization (form_post response, code type)"""
589        flow = create_test_flow()
590        provider = OAuth2Provider.objects.create(
591            name=generate_id(),
592            client_id=generate_id(),
593            authorization_flow=flow,
594            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
595            signing_key=self.keypair,
596        )
597        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
598        state = generate_id()
599        user = create_test_admin_user()
600        self.client.force_login(user)
601        # Step 1, initiate params and get redirect to flow
602        self.client.get(
603            reverse("authentik_providers_oauth2:authorize"),
604            data={
605                "response_type": "code",
606                "response_mode": "form_post",
607                "client_id": provider.client_id,
608                "state": state,
609                "scope": "openid",
610                "redirect_uri": "http://localhost",
611            },
612        )
613        response = self.client.get(
614            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
615        )
616        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
617        self.assertJSONEqual(
618            response.content.decode(),
619            {
620                "component": "ak-stage-autosubmit",
621                "url": "http://localhost",
622                "title": f"Redirecting to {app.name}...",
623                "attrs": {
624                    "code": code.code,
625                    "state": state,
626                },
627            },
628        )
629
630    def test_openid_missing_invalid(self):
631        """test request requiring an OpenID scope to be set"""
632        OAuth2Provider.objects.create(
633            name=generate_id(),
634            client_id="test",
635            authorization_flow=create_test_flow(),
636            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
637        )
638        request = self.factory.get(
639            "/",
640            data={
641                "response_type": "id_token",
642                "client_id": "test",
643                "redirect_uri": "http://localhost",
644                "scope": "",
645            },
646        )
647        with self.assertRaises(AuthorizeError) as cm:
648            OAuthAuthorizationParams.from_request(request)
649        self.assertEqual(cm.exception.cause, "scope_openid_missing")
650
651    @apply_blueprint("system/providers-oauth2.yaml")
652    def test_offline_access_invalid(self):
653        """test request for offline_access with invalid response type"""
654        provider = OAuth2Provider.objects.create(
655            name=generate_id(),
656            client_id="test",
657            authorization_flow=create_test_flow(),
658            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
659        )
660        provider.property_mappings.set(
661            ScopeMapping.objects.filter(
662                managed__in=[
663                    "goauthentik.io/providers/oauth2/scope-openid",
664                    "goauthentik.io/providers/oauth2/scope-offline_access",
665                ]
666            )
667        )
668        request = self.factory.get(
669            "/",
670            data={
671                "response_type": "id_token",
672                "client_id": "test",
673                "redirect_uri": "http://localhost",
674                "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}",
675                "nonce": generate_id(),
676            },
677        )
678        parsed = OAuthAuthorizationParams.from_request(request)
679        self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope)
680
681    @apply_blueprint("default/flow-default-authentication-flow.yaml")
682    def test_ui_locales(self):
683        """Test OIDC ui_locales authorization"""
684        flow = create_test_flow()
685        provider = OAuth2Provider.objects.create(
686            name=generate_id(),
687            client_id="test",
688            authorization_flow=flow,
689            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
690            access_code_validity="seconds=100",
691        )
692        Application.objects.create(name="app", slug="app", provider=provider)
693        state = generate_id()
694        self.client.logout()
695        try:
696            response = self.client.get(
697                reverse("authentik_providers_oauth2:authorize"),
698                data={
699                    "response_type": "code",
700                    "client_id": "test",
701                    "state": state,
702                    "redirect_uri": "foo://localhost",
703                    "ui_locales": "invalid fr",
704                },
705            )
706            parsed = parse_qs(urlparse(response.url).query)
707            self.assertEqual(parsed["locale"], ["fr"])
708        finally:
709            translation.deactivate()
710
711    @apply_blueprint("default/flow-default-authentication-flow.yaml")
712    def test_ui_locales_invalid(self):
713        """Test OIDC ui_locales authorization"""
714        flow = create_test_flow()
715        provider = OAuth2Provider.objects.create(
716            name=generate_id(),
717            client_id="test",
718            authorization_flow=flow,
719            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
720            access_code_validity="seconds=100",
721        )
722        Application.objects.create(name="app", slug="app", provider=provider)
723        state = generate_id()
724        self.client.logout()
725        response = self.client.get(
726            reverse("authentik_providers_oauth2:authorize"),
727            data={
728                "response_type": "code",
729                "client_id": "test",
730                "state": state,
731                "redirect_uri": "foo://localhost",
732                "ui_locales": "invalid",
733            },
734        )
735        parsed = parse_qs(urlparse(response.url).query)
736        self.assertNotIn("locale", parsed)
737
738    def test_authentication_flow(self):
739        """Test custom authentication flow"""
740        brand = create_test_brand()
741        global_auth = create_test_flow()
742        FlowStageBinding.objects.create(
743            target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10
744        )
745        brand.flow_authentication = global_auth
746        brand.save()
747
748        flow = create_test_flow()
749        auth_flow = create_test_flow()
750        FlowStageBinding.objects.create(
751            target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10
752        )
753        provider = OAuth2Provider.objects.create(
754            name=generate_id(),
755            client_id="test",
756            authorization_flow=flow,
757            authentication_flow=auth_flow,
758            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
759            access_code_validity="seconds=100",
760        )
761        Application.objects.create(name="app", slug="app", provider=provider)
762        state = generate_id()
763        response = self.client.get(
764            reverse("authentik_providers_oauth2:authorize"),
765            data={
766                "response_type": "code",
767                "client_id": "test",
768                "state": state,
769                "redirect_uri": "foo://localhost",
770            },
771        )
772        self.assertEqual(response.status_code, 302)
773        self.assertIn(auth_flow.slug, response.url)
774        self.assertNotIn(global_auth.slug, response.url)
775
776    @apply_blueprint("default/flow-default-authentication-flow.yaml")
777    def test_login_hint(self):
778        """Login hint"""
779        flow = create_test_flow()
780        provider = OAuth2Provider.objects.create(
781            name=generate_id(),
782            client_id="test",
783            authorization_flow=flow,
784            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
785            access_code_validity="seconds=100",
786        )
787        Application.objects.create(name="app", slug="app", provider=provider)
788        state = generate_id()
789        response = self.client.get(
790            reverse("authentik_providers_oauth2:authorize"),
791            data={
792                "response_type": "code",
793                "client_id": "test",
794                "state": state,
795                "redirect_uri": "foo://localhost",
796                "login_hint": "foo",
797            },
798        )
799        self.assertEqual(response.status_code, 302)
800        plan = self.client.session.get(SESSION_KEY_PLAN)
801        self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")

Test authorize view

def setUp(self) -> None:
41    def setUp(self) -> None:
42        super().setUp()
43        self.factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

def test_invalid_grant_type(self):
45    def test_invalid_grant_type(self):
46        """Test with invalid grant type"""
47        OAuth2Provider.objects.create(
48            name=generate_id(),
49            client_id="test",
50            authorization_flow=create_test_flow(),
51            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
52        )
53        with self.assertRaises(AuthorizeError) as cm:
54            request = self.factory.get(
55                "/",
56                data={
57                    "response_type": "invalid",
58                    "client_id": "test",
59                    "redirect_uri": "http://local.invalid/Foo",
60                },
61            )
62            OAuthAuthorizationParams.from_request(request)
63        self.assertEqual(cm.exception.error, "unsupported_response_type")

Test with invalid grant type

def test_invalid_client_id(self):
65    def test_invalid_client_id(self):
66        """Test invalid client ID"""
67        with self.assertRaises(ClientIdError):
68            request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"})
69            OAuthAuthorizationParams.from_request(request)

Test invalid client ID

def test_request(self):
71    def test_request(self):
72        """test request param"""
73        OAuth2Provider.objects.create(
74            name=generate_id(),
75            client_id="test",
76            authorization_flow=create_test_flow(),
77            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
78        )
79        with self.assertRaises(AuthorizeError) as cm:
80            request = self.factory.get(
81                "/",
82                data={
83                    "response_type": "code",
84                    "client_id": "test",
85                    "redirect_uri": "http://local.invalid/Foo",
86                    "request": "foo",
87                },
88            )
89            OAuthAuthorizationParams.from_request(request)
90        self.assertEqual(cm.exception.error, "request_not_supported")

test request param

def test_invalid_redirect_uri_missing(self):
 92    def test_invalid_redirect_uri_missing(self):
 93        """test missing redirect URI"""
 94        OAuth2Provider.objects.create(
 95            name=generate_id(),
 96            client_id="test",
 97            authorization_flow=create_test_flow(),
 98            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
 99        )
100        with self.assertRaises(RedirectUriError) as cm:
101            request = self.factory.get("/", data={"response_type": "code", "client_id": "test"})
102            OAuthAuthorizationParams.from_request(request)
103        self.assertEqual(cm.exception.cause, "redirect_uri_missing")

test missing redirect URI

def test_invalid_redirect_uri(self):
105    def test_invalid_redirect_uri(self):
106        """test invalid redirect URI"""
107        OAuth2Provider.objects.create(
108            name=generate_id(),
109            client_id="test",
110            authorization_flow=create_test_flow(),
111            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
112        )
113        with self.assertRaises(RedirectUriError) as cm:
114            request = self.factory.get(
115                "/",
116                data={
117                    "response_type": "code",
118                    "client_id": "test",
119                    "redirect_uri": "http://localhost",
120                },
121            )
122            OAuthAuthorizationParams.from_request(request)
123        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")

test invalid redirect URI

def test_blocked_redirect_uri(self):
125    def test_blocked_redirect_uri(self):
126        """test missing/invalid redirect URI"""
127        OAuth2Provider.objects.create(
128            name=generate_id(),
129            client_id="test",
130            authorization_flow=create_test_flow(),
131            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")],
132        )
133        with self.assertRaises(RedirectUriError) as cm:
134            request = self.factory.get(
135                "/",
136                data={
137                    "response_type": "code",
138                    "client_id": "test",
139                    "redirect_uri": "data:localhost",
140                },
141            )
142            OAuthAuthorizationParams.from_request(request)
143        self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme")

test missing/invalid redirect URI

def test_invalid_redirect_uri_empty(self):
145    def test_invalid_redirect_uri_empty(self):
146        """test missing/invalid redirect URI"""
147        provider = OAuth2Provider.objects.create(
148            name=generate_id(),
149            client_id="test",
150            authorization_flow=create_test_flow(),
151            redirect_uris=[],
152        )
153        request = self.factory.get(
154            "/",
155            data={
156                "response_type": "code",
157                "client_id": "test",
158                "redirect_uri": "+",
159            },
160        )
161        OAuthAuthorizationParams.from_request(request)
162        provider.refresh_from_db()
163        self.assertEqual(provider.redirect_uris, [RedirectURI(RedirectURIMatchingMode.STRICT, "+")])

test missing/invalid redirect URI

def test_invalid_redirect_uri_regex(self):
165    def test_invalid_redirect_uri_regex(self):
166        """test missing/invalid redirect URI"""
167        OAuth2Provider.objects.create(
168            name=generate_id(),
169            client_id="test",
170            authorization_flow=create_test_flow(),
171            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")],
172        )
173        with self.assertRaises(RedirectUriError) as cm:
174            request = self.factory.get(
175                "/",
176                data={
177                    "response_type": "code",
178                    "client_id": "test",
179                    "redirect_uri": "http://localhost",
180                },
181            )
182            OAuthAuthorizationParams.from_request(request)
183        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")

test missing/invalid redirect URI

def test_redirect_uri_invalid_regex(self):
185    def test_redirect_uri_invalid_regex(self):
186        """test missing/invalid redirect URI (invalid regex)"""
187        OAuth2Provider.objects.create(
188            name=generate_id(),
189            client_id="test",
190            authorization_flow=create_test_flow(),
191            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")],
192        )
193        with self.assertRaises(RedirectUriError) as cm:
194            request = self.factory.get(
195                "/",
196                data={
197                    "response_type": "code",
198                    "client_id": "test",
199                    "redirect_uri": "http://localhost",
200                },
201            )
202            OAuthAuthorizationParams.from_request(request)
203        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")

test missing/invalid redirect URI (invalid regex)

def test_redirect_uri_regex(self):
205    def test_redirect_uri_regex(self):
206        """test valid redirect URI (regex)"""
207        OAuth2Provider.objects.create(
208            name=generate_id(),
209            client_id="test",
210            authorization_flow=create_test_flow(),
211            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")],
212        )
213        request = self.factory.get(
214            "/",
215            data={
216                "response_type": "code",
217                "client_id": "test",
218                "redirect_uri": "http://foo.bar.baz",
219            },
220        )
221        OAuthAuthorizationParams.from_request(request)

test valid redirect URI (regex)

@apply_blueprint('system/providers-oauth2.yaml')
def test_response_type(self):
223    @apply_blueprint("system/providers-oauth2.yaml")
224    def test_response_type(self):
225        """test response_type"""
226        provider = OAuth2Provider.objects.create(
227            name=generate_id(),
228            client_id="test",
229            authorization_flow=create_test_flow(),
230            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
231        )
232        provider.property_mappings.set(
233            ScopeMapping.objects.filter(
234                managed__in=[
235                    "goauthentik.io/providers/oauth2/scope-openid",
236                    "goauthentik.io/providers/oauth2/scope-email",
237                    "goauthentik.io/providers/oauth2/scope-profile",
238                ]
239            )
240        )
241        request = self.factory.get(
242            "/",
243            data={
244                "response_type": "code",
245                "client_id": "test",
246                "redirect_uri": "http://local.invalid/Foo",
247            },
248        )
249        self.assertEqual(
250            OAuthAuthorizationParams.from_request(request).grant_type,
251            GrantTypes.AUTHORIZATION_CODE,
252        )
253        self.assertEqual(
254            OAuthAuthorizationParams.from_request(request).redirect_uri,
255            "http://local.invalid/Foo",
256        )
257        request = self.factory.get(
258            "/",
259            data={
260                "response_type": "id_token",
261                "client_id": "test",
262                "redirect_uri": "http://local.invalid/Foo",
263                "scope": "openid",
264                "state": "foo",
265                "nonce": generate_id(),
266            },
267        )
268        self.assertEqual(
269            OAuthAuthorizationParams.from_request(request).grant_type,
270            GrantTypes.IMPLICIT,
271        )
272        # Implicit without openid scope
273        with self.assertRaises(AuthorizeError) as cm:
274            request = self.factory.get(
275                "/",
276                data={
277                    "response_type": "id_token",
278                    "client_id": "test",
279                    "redirect_uri": "http://local.invalid/Foo",
280                    "state": "foo",
281                },
282            )
283            self.assertEqual(
284                OAuthAuthorizationParams.from_request(request).grant_type,
285                GrantTypes.IMPLICIT,
286            )
287        request = self.factory.get(
288            "/",
289            data={
290                "response_type": "code token",
291                "client_id": "test",
292                "redirect_uri": "http://local.invalid/Foo",
293                "scope": "openid",
294                "state": "foo",
295            },
296        )
297        self.assertEqual(
298            OAuthAuthorizationParams.from_request(request).grant_type, GrantTypes.HYBRID
299        )
300        with self.assertRaises(AuthorizeError) as cm:
301            request = self.factory.get(
302                "/",
303                data={
304                    "response_type": "invalid",
305                    "client_id": "test",
306                    "redirect_uri": "http://local.invalid/Foo",
307                },
308            )
309            OAuthAuthorizationParams.from_request(request)
310        self.assertEqual(cm.exception.error, "unsupported_response_type")

test response_type

def test_full_code(self):
312    def test_full_code(self):
313        """Test full authorization"""
314        flow = create_test_flow()
315        provider = OAuth2Provider.objects.create(
316            name=generate_id(),
317            client_id="test",
318            authorization_flow=flow,
319            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
320            access_code_validity="seconds=100",
321        )
322        Application.objects.create(name="app", slug="app", provider=provider)
323        state = generate_id()
324        user = create_test_admin_user()
325        self.client.force_login(user)
326        # Step 1, initiate params and get redirect to flow
327        response = self.client.get(
328            reverse("authentik_providers_oauth2:authorize"),
329            data={
330                "response_type": "code",
331                "client_id": "test",
332                "state": state,
333                "redirect_uri": "foo://localhost",
334            },
335        )
336        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
337        self.assertEqual(
338            response.url,
339            f"foo://localhost?code={code.code}&state={state}",
340        )
341        self.assertAlmostEqual(
342            code.expires.timestamp() - now().timestamp(),
343            timedelta_from_string(provider.access_code_validity).total_seconds(),
344            delta=5,
345        )

Test full authorization

@apply_blueprint('system/providers-oauth2.yaml')
def test_full_implicit(self):
347    @apply_blueprint("system/providers-oauth2.yaml")
348    def test_full_implicit(self):
349        """Test full authorization"""
350        flow = create_test_flow()
351        provider: OAuth2Provider = OAuth2Provider.objects.create(
352            name=generate_id(),
353            client_id="test",
354            authorization_flow=flow,
355            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
356            signing_key=self.keypair,
357        )
358        provider.property_mappings.set(
359            ScopeMapping.objects.filter(
360                managed__in=[
361                    "goauthentik.io/providers/oauth2/scope-openid",
362                    "goauthentik.io/providers/oauth2/scope-email",
363                    "goauthentik.io/providers/oauth2/scope-profile",
364                ]
365            )
366        )
367        provider.property_mappings.add(
368            ScopeMapping.objects.create(
369                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
370            )
371        )
372        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
373        state = generate_id()
374        user = create_test_admin_user()
375        self.client.force_login(user)
376        with patch(
377            "authentik.providers.oauth2.id_token.get_login_event",
378            MagicMock(
379                return_value=Event(
380                    action=EventAction.LOGIN,
381                    context={PLAN_CONTEXT_METHOD: "password"},
382                    created=now(),
383                )
384            ),
385        ):
386            # Step 1, initiate params and get redirect to flow
387            response = self.client.get(
388                reverse("authentik_providers_oauth2:authorize"),
389                data={
390                    "response_type": "id_token",
391                    "client_id": "test",
392                    "state": state,
393                    "scope": "openid test",
394                    "redirect_uri": "http://localhost",
395                    "nonce": generate_id(),
396                },
397            )
398            token: AccessToken = AccessToken.objects.filter(user=user).first()
399            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
400            self.assertEqual(
401                response.url,
402                (
403                    f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}"
404                    f"&token_type={TOKEN_TYPE}"
405                    f"&expires_in={int(expires)}&state={state}"
406                ),
407            )
408            jwt = self.validate_jwt(token, provider)
409            self.assertEqual(jwt["amr"], ["pwd"])
410            self.assertEqual(jwt["sub"], "foo")
411            self.assertAlmostEqual(
412                jwt["exp"] - now().timestamp(),
413                expires,
414                delta=5,
415            )

Test full authorization

@apply_blueprint('system/providers-oauth2.yaml')
def test_full_implicit_enc(self):
417    @apply_blueprint("system/providers-oauth2.yaml")
418    def test_full_implicit_enc(self):
419        """Test full authorization with encryption"""
420        flow = create_test_flow()
421        provider: OAuth2Provider = OAuth2Provider.objects.create(
422            name=generate_id(),
423            client_id="test",
424            authorization_flow=flow,
425            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
426            signing_key=self.keypair,
427            encryption_key=self.keypair,
428        )
429        provider.property_mappings.set(
430            ScopeMapping.objects.filter(
431                managed__in=[
432                    "goauthentik.io/providers/oauth2/scope-openid",
433                    "goauthentik.io/providers/oauth2/scope-email",
434                    "goauthentik.io/providers/oauth2/scope-profile",
435                ]
436            )
437        )
438        provider.property_mappings.add(
439            ScopeMapping.objects.create(
440                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
441            )
442        )
443        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
444        state = generate_id()
445        user = create_test_admin_user()
446        self.client.force_login(user)
447        with patch(
448            "authentik.providers.oauth2.id_token.get_login_event",
449            MagicMock(
450                return_value=Event(
451                    action=EventAction.LOGIN,
452                    context={PLAN_CONTEXT_METHOD: "password"},
453                    created=now(),
454                )
455            ),
456        ):
457            # Step 1, initiate params and get redirect to flow
458            response = self.client.get(
459                reverse("authentik_providers_oauth2:authorize"),
460                data={
461                    "response_type": "id_token",
462                    "client_id": "test",
463                    "state": state,
464                    "scope": "openid test",
465                    "redirect_uri": "http://localhost",
466                    "nonce": generate_id(),
467                },
468            )
469            self.assertEqual(response.status_code, 302)
470            token: AccessToken = AccessToken.objects.filter(user=user).first()
471            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
472            jwt = self.validate_jwe(token, provider)
473            self.assertEqual(jwt["amr"], ["pwd"])
474            self.assertEqual(jwt["sub"], "foo")
475            self.assertAlmostEqual(
476                jwt["exp"] - now().timestamp(),
477                expires,
478                delta=5,
479            )

Test full authorization with encryption

def test_full_fragment_code(self):
481    def test_full_fragment_code(self):
482        """Test full authorization"""
483        flow = create_test_flow()
484        provider: OAuth2Provider = OAuth2Provider.objects.create(
485            name=generate_id(),
486            client_id="test",
487            authorization_flow=flow,
488            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
489            signing_key=self.keypair,
490        )
491        Application.objects.create(name="app", slug="app", provider=provider)
492        state = generate_id()
493        user = create_test_admin_user()
494        self.client.force_login(user)
495        with patch(
496            "authentik.providers.oauth2.id_token.get_login_event",
497            MagicMock(
498                return_value=Event(
499                    action=EventAction.LOGIN,
500                    context={PLAN_CONTEXT_METHOD: "password"},
501                    created=now(),
502                )
503            ),
504        ):
505            # Step 1, initiate params and get redirect to flow
506            response = self.client.get(
507                reverse("authentik_providers_oauth2:authorize"),
508                data={
509                    "response_type": "code",
510                    "response_mode": "fragment",
511                    "client_id": "test",
512                    "state": state,
513                    "scope": "openid",
514                    "redirect_uri": "http://localhost",
515                    "nonce": generate_id(),
516                },
517            )
518            code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
519            self.assertEqual(
520                response.url,
521                f"http://localhost#code={code.code}&state={state}",
522            )
523            self.assertAlmostEqual(
524                code.expires.timestamp() - now().timestamp(),
525                timedelta_from_string(provider.access_code_validity).total_seconds(),
526                delta=5,
527            )

Test full authorization

@apply_blueprint('system/providers-oauth2.yaml')
def test_full_form_post_id_token(self):
529    @apply_blueprint("system/providers-oauth2.yaml")
530    def test_full_form_post_id_token(self):
531        """Test full authorization (form_post response)"""
532        flow = create_test_flow()
533        provider = OAuth2Provider.objects.create(
534            name=generate_id(),
535            client_id=generate_id(),
536            authorization_flow=flow,
537            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
538            signing_key=self.keypair,
539        )
540        provider.property_mappings.set(
541            ScopeMapping.objects.filter(
542                managed__in=[
543                    "goauthentik.io/providers/oauth2/scope-openid",
544                    "goauthentik.io/providers/oauth2/scope-email",
545                    "goauthentik.io/providers/oauth2/scope-profile",
546                ]
547            )
548        )
549        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
550        state = generate_id()
551        user = create_test_admin_user()
552        self.client.force_login(user)
553        # Step 1, initiate params and get redirect to flow
554        self.client.get(
555            reverse("authentik_providers_oauth2:authorize"),
556            data={
557                "response_type": "id_token",
558                "response_mode": "form_post",
559                "client_id": provider.client_id,
560                "state": state,
561                "scope": "openid",
562                "redirect_uri": "http://localhost",
563                "nonce": generate_id(),
564            },
565        )
566        response = self.client.get(
567            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
568        )
569        token: AccessToken = AccessToken.objects.filter(user=user).first()
570        self.assertIsNotNone(token)
571        self.assertJSONEqual(
572            response.content.decode(),
573            {
574                "component": "ak-stage-autosubmit",
575                "url": "http://localhost",
576                "title": f"Redirecting to {app.name}...",
577                "attrs": {
578                    "id_token": provider.encode(token.id_token.to_dict()),
579                    "token_type": TOKEN_TYPE,
580                    "expires_in": "3600",
581                    "state": state,
582                },
583            },
584        )
585        self.validate_jwt(token, provider)

Test full authorization (form_post response)

def test_full_form_post_code(self):
587    def test_full_form_post_code(self):
588        """Test full authorization (form_post response, code type)"""
589        flow = create_test_flow()
590        provider = OAuth2Provider.objects.create(
591            name=generate_id(),
592            client_id=generate_id(),
593            authorization_flow=flow,
594            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
595            signing_key=self.keypair,
596        )
597        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
598        state = generate_id()
599        user = create_test_admin_user()
600        self.client.force_login(user)
601        # Step 1, initiate params and get redirect to flow
602        self.client.get(
603            reverse("authentik_providers_oauth2:authorize"),
604            data={
605                "response_type": "code",
606                "response_mode": "form_post",
607                "client_id": provider.client_id,
608                "state": state,
609                "scope": "openid",
610                "redirect_uri": "http://localhost",
611            },
612        )
613        response = self.client.get(
614            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
615        )
616        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
617        self.assertJSONEqual(
618            response.content.decode(),
619            {
620                "component": "ak-stage-autosubmit",
621                "url": "http://localhost",
622                "title": f"Redirecting to {app.name}...",
623                "attrs": {
624                    "code": code.code,
625                    "state": state,
626                },
627            },
628        )

Test full authorization (form_post response, code type)

def test_openid_missing_invalid(self):
630    def test_openid_missing_invalid(self):
631        """test request requiring an OpenID scope to be set"""
632        OAuth2Provider.objects.create(
633            name=generate_id(),
634            client_id="test",
635            authorization_flow=create_test_flow(),
636            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
637        )
638        request = self.factory.get(
639            "/",
640            data={
641                "response_type": "id_token",
642                "client_id": "test",
643                "redirect_uri": "http://localhost",
644                "scope": "",
645            },
646        )
647        with self.assertRaises(AuthorizeError) as cm:
648            OAuthAuthorizationParams.from_request(request)
649        self.assertEqual(cm.exception.cause, "scope_openid_missing")

test request requiring an OpenID scope to be set

@apply_blueprint('system/providers-oauth2.yaml')
def test_offline_access_invalid(self):
651    @apply_blueprint("system/providers-oauth2.yaml")
652    def test_offline_access_invalid(self):
653        """test request for offline_access with invalid response type"""
654        provider = OAuth2Provider.objects.create(
655            name=generate_id(),
656            client_id="test",
657            authorization_flow=create_test_flow(),
658            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
659        )
660        provider.property_mappings.set(
661            ScopeMapping.objects.filter(
662                managed__in=[
663                    "goauthentik.io/providers/oauth2/scope-openid",
664                    "goauthentik.io/providers/oauth2/scope-offline_access",
665                ]
666            )
667        )
668        request = self.factory.get(
669            "/",
670            data={
671                "response_type": "id_token",
672                "client_id": "test",
673                "redirect_uri": "http://localhost",
674                "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}",
675                "nonce": generate_id(),
676            },
677        )
678        parsed = OAuthAuthorizationParams.from_request(request)
679        self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope)

test request for offline_access with invalid response type

@apply_blueprint('default/flow-default-authentication-flow.yaml')
def test_ui_locales(self):
681    @apply_blueprint("default/flow-default-authentication-flow.yaml")
682    def test_ui_locales(self):
683        """Test OIDC ui_locales authorization"""
684        flow = create_test_flow()
685        provider = OAuth2Provider.objects.create(
686            name=generate_id(),
687            client_id="test",
688            authorization_flow=flow,
689            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
690            access_code_validity="seconds=100",
691        )
692        Application.objects.create(name="app", slug="app", provider=provider)
693        state = generate_id()
694        self.client.logout()
695        try:
696            response = self.client.get(
697                reverse("authentik_providers_oauth2:authorize"),
698                data={
699                    "response_type": "code",
700                    "client_id": "test",
701                    "state": state,
702                    "redirect_uri": "foo://localhost",
703                    "ui_locales": "invalid fr",
704                },
705            )
706            parsed = parse_qs(urlparse(response.url).query)
707            self.assertEqual(parsed["locale"], ["fr"])
708        finally:
709            translation.deactivate()

Test OIDC ui_locales authorization

@apply_blueprint('default/flow-default-authentication-flow.yaml')
def test_ui_locales_invalid(self):
711    @apply_blueprint("default/flow-default-authentication-flow.yaml")
712    def test_ui_locales_invalid(self):
713        """Test OIDC ui_locales authorization"""
714        flow = create_test_flow()
715        provider = OAuth2Provider.objects.create(
716            name=generate_id(),
717            client_id="test",
718            authorization_flow=flow,
719            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
720            access_code_validity="seconds=100",
721        )
722        Application.objects.create(name="app", slug="app", provider=provider)
723        state = generate_id()
724        self.client.logout()
725        response = self.client.get(
726            reverse("authentik_providers_oauth2:authorize"),
727            data={
728                "response_type": "code",
729                "client_id": "test",
730                "state": state,
731                "redirect_uri": "foo://localhost",
732                "ui_locales": "invalid",
733            },
734        )
735        parsed = parse_qs(urlparse(response.url).query)
736        self.assertNotIn("locale", parsed)

Test OIDC ui_locales authorization

def test_authentication_flow(self):
738    def test_authentication_flow(self):
739        """Test custom authentication flow"""
740        brand = create_test_brand()
741        global_auth = create_test_flow()
742        FlowStageBinding.objects.create(
743            target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10
744        )
745        brand.flow_authentication = global_auth
746        brand.save()
747
748        flow = create_test_flow()
749        auth_flow = create_test_flow()
750        FlowStageBinding.objects.create(
751            target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10
752        )
753        provider = OAuth2Provider.objects.create(
754            name=generate_id(),
755            client_id="test",
756            authorization_flow=flow,
757            authentication_flow=auth_flow,
758            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
759            access_code_validity="seconds=100",
760        )
761        Application.objects.create(name="app", slug="app", provider=provider)
762        state = generate_id()
763        response = self.client.get(
764            reverse("authentik_providers_oauth2:authorize"),
765            data={
766                "response_type": "code",
767                "client_id": "test",
768                "state": state,
769                "redirect_uri": "foo://localhost",
770            },
771        )
772        self.assertEqual(response.status_code, 302)
773        self.assertIn(auth_flow.slug, response.url)
774        self.assertNotIn(global_auth.slug, response.url)

Test custom authentication flow

@apply_blueprint('default/flow-default-authentication-flow.yaml')
def test_login_hint(self):
776    @apply_blueprint("default/flow-default-authentication-flow.yaml")
777    def test_login_hint(self):
778        """Login hint"""
779        flow = create_test_flow()
780        provider = OAuth2Provider.objects.create(
781            name=generate_id(),
782            client_id="test",
783            authorization_flow=flow,
784            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
785            access_code_validity="seconds=100",
786        )
787        Application.objects.create(name="app", slug="app", provider=provider)
788        state = generate_id()
789        response = self.client.get(
790            reverse("authentik_providers_oauth2:authorize"),
791            data={
792                "response_type": "code",
793                "client_id": "test",
794                "state": state,
795                "redirect_uri": "foo://localhost",
796                "login_hint": "foo",
797            },
798        )
799        self.assertEqual(response.status_code, 302)
800        plan = self.client.session.get(SESSION_KEY_PLAN)
801        self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")

Login hint