authentik.providers.oauth2.tests.test_authorize

Test authorize view

  1"""Test authorize view"""
  2
  3from unittest.mock import MagicMock, patch
  4from urllib.parse import parse_qs, urlparse
  5
  6from django.test import RequestFactory
  7from django.urls import reverse
  8from django.utils import translation
  9from django.utils.timezone import now
 10
 11from authentik.blueprints.tests import apply_blueprint
 12from authentik.common.oauth.constants import SCOPE_OFFLINE_ACCESS, SCOPE_OPENID, TOKEN_TYPE
 13from authentik.core.models import Application
 14from authentik.core.tests.utils import create_test_admin_user, create_test_brand, create_test_flow
 15from authentik.events.models import Event, EventAction
 16from authentik.flows.models import FlowStageBinding
 17from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER
 18from authentik.flows.views.executor import SESSION_KEY_PLAN
 19from authentik.lib.generators import generate_id
 20from authentik.lib.utils.time import timedelta_from_string
 21from authentik.providers.oauth2.errors import AuthorizeError, ClientIdError, RedirectUriError
 22from authentik.providers.oauth2.models import (
 23    AccessToken,
 24    AuthorizationCode,
 25    GrantType,
 26    OAuth2Provider,
 27    RedirectURI,
 28    RedirectURIMatchingMode,
 29    ScopeMapping,
 30)
 31from authentik.providers.oauth2.tests.utils import OAuthTestCase
 32from authentik.providers.oauth2.views.authorize import OAuthAuthorizationParams
 33from authentik.stages.dummy.models import DummyStage
 34from authentik.stages.password.stage import PLAN_CONTEXT_METHOD
 35
 36
 37class TestAuthorize(OAuthTestCase):
 38    """Test authorize view"""
 39
 40    def setUp(self) -> None:
 41        super().setUp()
 42        self.factory = RequestFactory()
 43
 44    def test_disallowed_grant_type(self):
 45        """Test with disallowed grant type"""
 46        OAuth2Provider.objects.create(
 47            name=generate_id(),
 48            client_id="test",
 49            grant_types=[],
 50            authorization_flow=create_test_flow(),
 51            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
 52        )
 53        with self.assertRaises(AuthorizeError) as cm:
 54            request = self.factory.get(
 55                "/",
 56                data={
 57                    "response_type": "code",
 58                    "client_id": "test",
 59                    "redirect_uri": "http://local.invalid/Foo",
 60                },
 61            )
 62            OAuthAuthorizationParams.from_request(request)
 63        self.assertEqual(cm.exception.error, "invalid_request")
 64
 65    def test_invalid_grant_type(self):
 66        """Test with invalid grant type"""
 67        OAuth2Provider.objects.create(
 68            name=generate_id(),
 69            client_id="test",
 70            authorization_flow=create_test_flow(),
 71            grant_types=[GrantType.AUTHORIZATION_CODE],
 72            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
 73        )
 74        with self.assertRaises(AuthorizeError) as cm:
 75            request = self.factory.get(
 76                "/",
 77                data={
 78                    "response_type": "invalid",
 79                    "client_id": "test",
 80                    "redirect_uri": "http://local.invalid/Foo",
 81                },
 82            )
 83            OAuthAuthorizationParams.from_request(request)
 84        self.assertEqual(cm.exception.error, "unsupported_response_type")
 85
 86    def test_invalid_client_id(self):
 87        """Test invalid client ID"""
 88        with self.assertRaises(ClientIdError):
 89            request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"})
 90            OAuthAuthorizationParams.from_request(request)
 91
 92    def test_request(self):
 93        """test request param"""
 94        OAuth2Provider.objects.create(
 95            name=generate_id(),
 96            client_id="test",
 97            authorization_flow=create_test_flow(),
 98            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
 99            grant_types=[GrantType.AUTHORIZATION_CODE],
100        )
101        with self.assertRaises(AuthorizeError) as cm:
102            request = self.factory.get(
103                "/",
104                data={
105                    "response_type": "code",
106                    "client_id": "test",
107                    "redirect_uri": "http://local.invalid/Foo",
108                    "request": "foo",
109                },
110            )
111            OAuthAuthorizationParams.from_request(request)
112        self.assertEqual(cm.exception.error, "request_not_supported")
113
114    def test_invalid_redirect_uri_missing(self):
115        """test missing redirect URI"""
116        OAuth2Provider.objects.create(
117            name=generate_id(),
118            client_id="test",
119            authorization_flow=create_test_flow(),
120            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
121        )
122        with self.assertRaises(RedirectUriError) as cm:
123            request = self.factory.get("/", data={"response_type": "code", "client_id": "test"})
124            OAuthAuthorizationParams.from_request(request)
125        self.assertEqual(cm.exception.cause, "redirect_uri_missing")
126
127    def test_invalid_redirect_uri(self):
128        """test invalid redirect URI"""
129        OAuth2Provider.objects.create(
130            name=generate_id(),
131            client_id="test",
132            authorization_flow=create_test_flow(),
133            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
134        )
135        with self.assertRaises(RedirectUriError) as cm:
136            request = self.factory.get(
137                "/",
138                data={
139                    "response_type": "code",
140                    "client_id": "test",
141                    "redirect_uri": "http://localhost",
142                },
143            )
144            OAuthAuthorizationParams.from_request(request)
145        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
146
147    def test_blocked_redirect_uri(self):
148        """test missing/invalid redirect URI"""
149        OAuth2Provider.objects.create(
150            name=generate_id(),
151            client_id="test",
152            authorization_flow=create_test_flow(),
153            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")],
154        )
155        with self.assertRaises(RedirectUriError) as cm:
156            request = self.factory.get(
157                "/",
158                data={
159                    "response_type": "code",
160                    "client_id": "test",
161                    "redirect_uri": "data:localhost",
162                },
163            )
164            OAuthAuthorizationParams.from_request(request)
165        self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme")
166
167    def test_invalid_redirect_uri_regex(self):
168        """test missing/invalid redirect URI"""
169        OAuth2Provider.objects.create(
170            name=generate_id(),
171            client_id="test",
172            authorization_flow=create_test_flow(),
173            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")],
174        )
175        with self.assertRaises(RedirectUriError) as cm:
176            request = self.factory.get(
177                "/",
178                data={
179                    "response_type": "code",
180                    "client_id": "test",
181                    "redirect_uri": "http://localhost",
182                },
183            )
184            OAuthAuthorizationParams.from_request(request)
185        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
186
187    def test_redirect_uri_invalid_regex(self):
188        """test missing/invalid redirect URI (invalid regex)"""
189        OAuth2Provider.objects.create(
190            name=generate_id(),
191            client_id="test",
192            authorization_flow=create_test_flow(),
193            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")],
194        )
195        with self.assertRaises(RedirectUriError) as cm:
196            request = self.factory.get(
197                "/",
198                data={
199                    "response_type": "code",
200                    "client_id": "test",
201                    "redirect_uri": "http://localhost",
202                },
203            )
204            OAuthAuthorizationParams.from_request(request)
205        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
206
207    def test_redirect_uri_regex(self):
208        """test valid redirect URI (regex)"""
209        OAuth2Provider.objects.create(
210            name=generate_id(),
211            client_id="test",
212            authorization_flow=create_test_flow(),
213            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")],
214            grant_types=[GrantType.AUTHORIZATION_CODE],
215        )
216        request = self.factory.get(
217            "/",
218            data={
219                "response_type": "code",
220                "client_id": "test",
221                "redirect_uri": "http://foo.bar.baz",
222            },
223        )
224        OAuthAuthorizationParams.from_request(request)
225
226    @apply_blueprint("system/providers-oauth2.yaml")
227    def test_response_type(self):
228        """test response_type"""
229        provider = OAuth2Provider.objects.create(
230            name=generate_id(),
231            client_id="test",
232            authorization_flow=create_test_flow(),
233            grant_types=[GrantType.AUTHORIZATION_CODE],
234            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
235        )
236        provider.property_mappings.set(
237            ScopeMapping.objects.filter(
238                managed__in=[
239                    "goauthentik.io/providers/oauth2/scope-openid",
240                    "goauthentik.io/providers/oauth2/scope-email",
241                    "goauthentik.io/providers/oauth2/scope-profile",
242                ]
243            )
244        )
245        request = self.factory.get(
246            "/",
247            data={
248                "response_type": "code",
249                "client_id": "test",
250                "redirect_uri": "http://local.invalid/Foo",
251            },
252        )
253        self.assertEqual(
254            OAuthAuthorizationParams.from_request(request).grant_type,
255            GrantType.AUTHORIZATION_CODE,
256        )
257        self.assertEqual(
258            OAuthAuthorizationParams.from_request(request).redirect_uri,
259            "http://local.invalid/Foo",
260        )
261        provider.grant_types = [GrantType.IMPLICIT]
262        provider.save()
263        request = self.factory.get(
264            "/",
265            data={
266                "response_type": "id_token",
267                "client_id": "test",
268                "redirect_uri": "http://local.invalid/Foo",
269                "scope": "openid",
270                "state": "foo",
271                "nonce": generate_id(),
272            },
273        )
274        self.assertEqual(
275            OAuthAuthorizationParams.from_request(request).grant_type,
276            GrantType.IMPLICIT,
277        )
278        # Implicit without openid scope
279        with self.assertRaises(AuthorizeError) as cm:
280            request = self.factory.get(
281                "/",
282                data={
283                    "response_type": "id_token",
284                    "client_id": "test",
285                    "redirect_uri": "http://local.invalid/Foo",
286                    "state": "foo",
287                },
288            )
289            self.assertEqual(
290                OAuthAuthorizationParams.from_request(request).grant_type,
291                GrantType.IMPLICIT,
292            )
293        provider.grant_types = [GrantType.HYBRID]
294        provider.save()
295        request = self.factory.get(
296            "/",
297            data={
298                "response_type": "code token",
299                "client_id": "test",
300                "redirect_uri": "http://local.invalid/Foo",
301                "scope": "openid",
302                "state": "foo",
303            },
304        )
305        self.assertEqual(
306            OAuthAuthorizationParams.from_request(request).grant_type, GrantType.HYBRID
307        )
308        with self.assertRaises(AuthorizeError) as cm:
309            request = self.factory.get(
310                "/",
311                data={
312                    "response_type": "invalid",
313                    "client_id": "test",
314                    "redirect_uri": "http://local.invalid/Foo",
315                },
316            )
317            OAuthAuthorizationParams.from_request(request)
318        self.assertEqual(cm.exception.error, "unsupported_response_type")
319
320    def test_full_code(self):
321        """Test full authorization"""
322        flow = create_test_flow()
323        provider = OAuth2Provider.objects.create(
324            name=generate_id(),
325            client_id="test",
326            authorization_flow=flow,
327            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
328            access_code_validity="seconds=100",
329            grant_types=[GrantType.AUTHORIZATION_CODE],
330        )
331        Application.objects.create(name="app", slug="app", provider=provider)
332        state = generate_id()
333        user = create_test_admin_user()
334        self.client.force_login(user)
335        # Step 1, initiate params and get redirect to flow
336        response = self.client.get(
337            reverse("authentik_providers_oauth2:authorize"),
338            data={
339                "response_type": "code",
340                "client_id": "test",
341                "state": state,
342                "redirect_uri": "foo://localhost",
343            },
344        )
345        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
346        self.assertEqual(
347            response.url,
348            f"foo://localhost?code={code.code}&state={state}",
349        )
350        self.assertAlmostEqual(
351            code.expires.timestamp() - now().timestamp(),
352            timedelta_from_string(provider.access_code_validity).total_seconds(),
353            delta=5,
354        )
355
356    @apply_blueprint("system/providers-oauth2.yaml")
357    def test_full_implicit(self):
358        """Test full authorization"""
359        flow = create_test_flow()
360        provider: OAuth2Provider = OAuth2Provider.objects.create(
361            name=generate_id(),
362            client_id="test",
363            authorization_flow=flow,
364            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
365            signing_key=self.keypair,
366            grant_types=[GrantType.IMPLICIT],
367        )
368        provider.property_mappings.set(
369            ScopeMapping.objects.filter(
370                managed__in=[
371                    "goauthentik.io/providers/oauth2/scope-openid",
372                    "goauthentik.io/providers/oauth2/scope-email",
373                    "goauthentik.io/providers/oauth2/scope-profile",
374                ]
375            )
376        )
377        provider.property_mappings.add(
378            ScopeMapping.objects.create(
379                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
380            )
381        )
382        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
383        state = generate_id()
384        user = create_test_admin_user()
385        self.client.force_login(user)
386        with patch(
387            "authentik.providers.oauth2.id_token.get_login_event",
388            MagicMock(
389                return_value=Event(
390                    action=EventAction.LOGIN,
391                    context={PLAN_CONTEXT_METHOD: "password"},
392                    created=now(),
393                )
394            ),
395        ):
396            # Step 1, initiate params and get redirect to flow
397            response = self.client.get(
398                reverse("authentik_providers_oauth2:authorize"),
399                data={
400                    "response_type": "id_token",
401                    "client_id": "test",
402                    "state": state,
403                    "scope": "openid test",
404                    "redirect_uri": "http://localhost",
405                    "nonce": generate_id(),
406                },
407            )
408            token = AccessToken.objects.filter(user=user).first()
409            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
410            self.assertEqual(
411                response.url,
412                (
413                    f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}"
414                    f"&token_type={TOKEN_TYPE}"
415                    f"&expires_in={int(expires)}&state={state}"
416                ),
417            )
418            jwt = self.validate_jwt(token, provider)
419            self.assertEqual(jwt["amr"], ["pwd"])
420            self.assertEqual(jwt["sub"], "foo")
421            self.assertAlmostEqual(
422                jwt["exp"] - now().timestamp(),
423                expires,
424                delta=5,
425            )
426
427    @apply_blueprint("system/providers-oauth2.yaml")
428    def test_full_implicit_enc(self):
429        """Test full authorization with encryption"""
430        flow = create_test_flow()
431        provider: OAuth2Provider = OAuth2Provider.objects.create(
432            name=generate_id(),
433            client_id="test",
434            authorization_flow=flow,
435            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
436            signing_key=self.keypair,
437            encryption_key=self.keypair,
438            grant_types=[GrantType.IMPLICIT],
439        )
440        provider.property_mappings.set(
441            ScopeMapping.objects.filter(
442                managed__in=[
443                    "goauthentik.io/providers/oauth2/scope-openid",
444                    "goauthentik.io/providers/oauth2/scope-email",
445                    "goauthentik.io/providers/oauth2/scope-profile",
446                ]
447            )
448        )
449        provider.property_mappings.add(
450            ScopeMapping.objects.create(
451                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
452            )
453        )
454        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
455        state = generate_id()
456        user = create_test_admin_user()
457        self.client.force_login(user)
458        with patch(
459            "authentik.providers.oauth2.id_token.get_login_event",
460            MagicMock(
461                return_value=Event(
462                    action=EventAction.LOGIN,
463                    context={PLAN_CONTEXT_METHOD: "password"},
464                    created=now(),
465                )
466            ),
467        ):
468            # Step 1, initiate params and get redirect to flow
469            response = self.client.get(
470                reverse("authentik_providers_oauth2:authorize"),
471                data={
472                    "response_type": "id_token",
473                    "client_id": "test",
474                    "state": state,
475                    "scope": "openid test",
476                    "redirect_uri": "http://localhost",
477                    "nonce": generate_id(),
478                },
479            )
480            self.assertEqual(response.status_code, 302)
481            token = AccessToken.objects.filter(user=user).first()
482            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
483            jwt = self.validate_jwe(token, provider)
484            self.assertEqual(jwt["amr"], ["pwd"])
485            self.assertEqual(jwt["sub"], "foo")
486            self.assertAlmostEqual(
487                jwt["exp"] - now().timestamp(),
488                expires,
489                delta=5,
490            )
491
492    def test_full_fragment_code(self):
493        """Test full authorization"""
494        flow = create_test_flow()
495        provider: OAuth2Provider = OAuth2Provider.objects.create(
496            name=generate_id(),
497            client_id="test",
498            authorization_flow=flow,
499            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
500            signing_key=self.keypair,
501            grant_types=[GrantType.AUTHORIZATION_CODE],
502        )
503        Application.objects.create(name="app", slug="app", provider=provider)
504        state = generate_id()
505        user = create_test_admin_user()
506        self.client.force_login(user)
507        with patch(
508            "authentik.providers.oauth2.id_token.get_login_event",
509            MagicMock(
510                return_value=Event(
511                    action=EventAction.LOGIN,
512                    context={PLAN_CONTEXT_METHOD: "password"},
513                    created=now(),
514                )
515            ),
516        ):
517            # Step 1, initiate params and get redirect to flow
518            response = self.client.get(
519                reverse("authentik_providers_oauth2:authorize"),
520                data={
521                    "response_type": "code",
522                    "response_mode": "fragment",
523                    "client_id": "test",
524                    "state": state,
525                    "scope": "openid",
526                    "redirect_uri": "http://localhost",
527                    "nonce": generate_id(),
528                },
529            )
530            code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
531            self.assertEqual(
532                response.url,
533                f"http://localhost#code={code.code}&state={state}",
534            )
535            self.assertAlmostEqual(
536                code.expires.timestamp() - now().timestamp(),
537                timedelta_from_string(provider.access_code_validity).total_seconds(),
538                delta=5,
539            )
540
541    @apply_blueprint("system/providers-oauth2.yaml")
542    def test_full_form_post_id_token(self):
543        """Test full authorization (form_post response)"""
544        flow = create_test_flow()
545        provider = OAuth2Provider.objects.create(
546            name=generate_id(),
547            client_id=generate_id(),
548            authorization_flow=flow,
549            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
550            signing_key=self.keypair,
551            grant_types=[GrantType.IMPLICIT],
552        )
553        provider.property_mappings.set(
554            ScopeMapping.objects.filter(
555                managed__in=[
556                    "goauthentik.io/providers/oauth2/scope-openid",
557                    "goauthentik.io/providers/oauth2/scope-email",
558                    "goauthentik.io/providers/oauth2/scope-profile",
559                ]
560            )
561        )
562        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
563        state = generate_id()
564        user = create_test_admin_user()
565        self.client.force_login(user)
566        # Step 1, initiate params and get redirect to flow
567        self.client.get(
568            reverse("authentik_providers_oauth2:authorize"),
569            data={
570                "response_type": "id_token",
571                "response_mode": "form_post",
572                "client_id": provider.client_id,
573                "state": state,
574                "scope": "openid",
575                "redirect_uri": "http://localhost",
576                "nonce": generate_id(),
577            },
578        )
579        response = self.client.get(
580            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
581        )
582        token = AccessToken.objects.filter(user=user).first()
583        self.assertIsNotNone(token)
584        self.assertJSONEqual(
585            response.content.decode(),
586            {
587                "component": "ak-stage-autosubmit",
588                "url": "http://localhost",
589                "title": f"Redirecting to {app.name}...",
590                "attrs": {
591                    "id_token": provider.encode(token.id_token.to_dict()),
592                    "token_type": TOKEN_TYPE,
593                    "expires_in": "3600",
594                    "state": state,
595                },
596            },
597        )
598        self.validate_jwt(token, provider)
599
600    def test_full_form_post_code(self):
601        """Test full authorization (form_post response, code type)"""
602        flow = create_test_flow()
603        provider = OAuth2Provider.objects.create(
604            name=generate_id(),
605            client_id=generate_id(),
606            authorization_flow=flow,
607            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
608            signing_key=self.keypair,
609            grant_types=[GrantType.AUTHORIZATION_CODE],
610        )
611        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
612        state = generate_id()
613        user = create_test_admin_user()
614        self.client.force_login(user)
615        # Step 1, initiate params and get redirect to flow
616        self.client.get(
617            reverse("authentik_providers_oauth2:authorize"),
618            data={
619                "response_type": "code",
620                "response_mode": "form_post",
621                "client_id": provider.client_id,
622                "state": state,
623                "scope": "openid",
624                "redirect_uri": "http://localhost",
625            },
626        )
627        response = self.client.get(
628            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
629        )
630        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
631        self.assertJSONEqual(
632            response.content.decode(),
633            {
634                "component": "ak-stage-autosubmit",
635                "url": "http://localhost",
636                "title": f"Redirecting to {app.name}...",
637                "attrs": {
638                    "code": code.code,
639                    "state": state,
640                },
641            },
642        )
643
644    def test_openid_missing_invalid(self):
645        """test request requiring an OpenID scope to be set"""
646        OAuth2Provider.objects.create(
647            name=generate_id(),
648            client_id="test",
649            authorization_flow=create_test_flow(),
650            grant_types=[GrantType.IMPLICIT],
651            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
652        )
653        request = self.factory.get(
654            "/",
655            data={
656                "response_type": "id_token",
657                "client_id": "test",
658                "redirect_uri": "http://localhost",
659                "scope": "",
660            },
661        )
662        with self.assertRaises(AuthorizeError) as cm:
663            OAuthAuthorizationParams.from_request(request)
664        self.assertEqual(cm.exception.cause, "scope_openid_missing")
665
666    @apply_blueprint("system/providers-oauth2.yaml")
667    def test_offline_access_invalid(self):
668        """test request for offline_access with invalid response type"""
669        provider = OAuth2Provider.objects.create(
670            name=generate_id(),
671            client_id="test",
672            authorization_flow=create_test_flow(),
673            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
674            grant_types=[GrantType.IMPLICIT],
675        )
676        provider.property_mappings.set(
677            ScopeMapping.objects.filter(
678                managed__in=[
679                    "goauthentik.io/providers/oauth2/scope-openid",
680                    "goauthentik.io/providers/oauth2/scope-offline_access",
681                ]
682            )
683        )
684        request = self.factory.get(
685            "/",
686            data={
687                "response_type": "id_token",
688                "client_id": "test",
689                "redirect_uri": "http://localhost",
690                "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}",
691                "nonce": generate_id(),
692            },
693        )
694        parsed = OAuthAuthorizationParams.from_request(request)
695        self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope)
696
697    @apply_blueprint("default/flow-default-authentication-flow.yaml")
698    def test_ui_locales(self):
699        """Test OIDC ui_locales authorization"""
700        flow = create_test_flow()
701        provider = OAuth2Provider.objects.create(
702            name=generate_id(),
703            client_id="test",
704            authorization_flow=flow,
705            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
706            access_code_validity="seconds=100",
707            grant_types=[GrantType.AUTHORIZATION_CODE],
708        )
709        Application.objects.create(name="app", slug="app", provider=provider)
710        state = generate_id()
711        self.client.logout()
712        try:
713            response = self.client.get(
714                reverse("authentik_providers_oauth2:authorize"),
715                data={
716                    "response_type": "code",
717                    "client_id": "test",
718                    "state": state,
719                    "redirect_uri": "foo://localhost",
720                    "ui_locales": "invalid fr",
721                },
722            )
723            parsed = parse_qs(urlparse(response.url).query)
724            self.assertEqual(parsed["locale"], ["fr"])
725        finally:
726            translation.deactivate()
727
728    @apply_blueprint("default/flow-default-authentication-flow.yaml")
729    def test_ui_locales_invalid(self):
730        """Test OIDC ui_locales authorization"""
731        flow = create_test_flow()
732        provider = OAuth2Provider.objects.create(
733            name=generate_id(),
734            client_id="test",
735            authorization_flow=flow,
736            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
737            access_code_validity="seconds=100",
738            grant_types=[GrantType.AUTHORIZATION_CODE],
739        )
740        Application.objects.create(name="app", slug="app", provider=provider)
741        state = generate_id()
742        self.client.logout()
743        response = self.client.get(
744            reverse("authentik_providers_oauth2:authorize"),
745            data={
746                "response_type": "code",
747                "client_id": "test",
748                "state": state,
749                "redirect_uri": "foo://localhost",
750                "ui_locales": "invalid",
751            },
752        )
753        parsed = parse_qs(urlparse(response.url).query)
754        self.assertNotIn("locale", parsed)
755
756    def test_authentication_flow(self):
757        """Test custom authentication flow"""
758        brand = create_test_brand()
759        global_auth = create_test_flow()
760        FlowStageBinding.objects.create(
761            target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10
762        )
763        brand.flow_authentication = global_auth
764        brand.save()
765
766        flow = create_test_flow()
767        auth_flow = create_test_flow()
768        FlowStageBinding.objects.create(
769            target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10
770        )
771        provider = OAuth2Provider.objects.create(
772            name=generate_id(),
773            client_id="test",
774            authorization_flow=flow,
775            authentication_flow=auth_flow,
776            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
777            access_code_validity="seconds=100",
778            grant_types=[GrantType.AUTHORIZATION_CODE],
779        )
780        Application.objects.create(name="app", slug="app", provider=provider)
781        state = generate_id()
782        response = self.client.get(
783            reverse("authentik_providers_oauth2:authorize"),
784            data={
785                "response_type": "code",
786                "client_id": "test",
787                "state": state,
788                "redirect_uri": "foo://localhost",
789            },
790        )
791        self.assertEqual(response.status_code, 302)
792        self.assertIn(auth_flow.slug, response.url)
793        self.assertNotIn(global_auth.slug, response.url)
794
795    @apply_blueprint("default/flow-default-authentication-flow.yaml")
796    def test_login_hint(self):
797        """Login hint"""
798        flow = create_test_flow()
799        provider = OAuth2Provider.objects.create(
800            name=generate_id(),
801            client_id="test",
802            authorization_flow=flow,
803            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
804            access_code_validity="seconds=100",
805            grant_types=[GrantType.AUTHORIZATION_CODE],
806        )
807        Application.objects.create(name="app", slug="app", provider=provider)
808        state = generate_id()
809        response = self.client.get(
810            reverse("authentik_providers_oauth2:authorize"),
811            data={
812                "response_type": "code",
813                "client_id": "test",
814                "state": state,
815                "redirect_uri": "foo://localhost",
816                "login_hint": "foo",
817            },
818        )
819        self.assertEqual(response.status_code, 302)
820        plan = self.client.session.get(SESSION_KEY_PLAN)
821        self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")
class TestAuthorize(authentik.providers.oauth2.tests.utils.OAuthTestCase):
 38class TestAuthorize(OAuthTestCase):
 39    """Test authorize view"""
 40
 41    def setUp(self) -> None:
 42        super().setUp()
 43        self.factory = RequestFactory()
 44
 45    def test_disallowed_grant_type(self):
 46        """Test with disallowed grant type"""
 47        OAuth2Provider.objects.create(
 48            name=generate_id(),
 49            client_id="test",
 50            grant_types=[],
 51            authorization_flow=create_test_flow(),
 52            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
 53        )
 54        with self.assertRaises(AuthorizeError) as cm:
 55            request = self.factory.get(
 56                "/",
 57                data={
 58                    "response_type": "code",
 59                    "client_id": "test",
 60                    "redirect_uri": "http://local.invalid/Foo",
 61                },
 62            )
 63            OAuthAuthorizationParams.from_request(request)
 64        self.assertEqual(cm.exception.error, "invalid_request")
 65
 66    def test_invalid_grant_type(self):
 67        """Test with invalid grant type"""
 68        OAuth2Provider.objects.create(
 69            name=generate_id(),
 70            client_id="test",
 71            authorization_flow=create_test_flow(),
 72            grant_types=[GrantType.AUTHORIZATION_CODE],
 73            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
 74        )
 75        with self.assertRaises(AuthorizeError) as cm:
 76            request = self.factory.get(
 77                "/",
 78                data={
 79                    "response_type": "invalid",
 80                    "client_id": "test",
 81                    "redirect_uri": "http://local.invalid/Foo",
 82                },
 83            )
 84            OAuthAuthorizationParams.from_request(request)
 85        self.assertEqual(cm.exception.error, "unsupported_response_type")
 86
 87    def test_invalid_client_id(self):
 88        """Test invalid client ID"""
 89        with self.assertRaises(ClientIdError):
 90            request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"})
 91            OAuthAuthorizationParams.from_request(request)
 92
 93    def test_request(self):
 94        """test request param"""
 95        OAuth2Provider.objects.create(
 96            name=generate_id(),
 97            client_id="test",
 98            authorization_flow=create_test_flow(),
 99            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
100            grant_types=[GrantType.AUTHORIZATION_CODE],
101        )
102        with self.assertRaises(AuthorizeError) as cm:
103            request = self.factory.get(
104                "/",
105                data={
106                    "response_type": "code",
107                    "client_id": "test",
108                    "redirect_uri": "http://local.invalid/Foo",
109                    "request": "foo",
110                },
111            )
112            OAuthAuthorizationParams.from_request(request)
113        self.assertEqual(cm.exception.error, "request_not_supported")
114
115    def test_invalid_redirect_uri_missing(self):
116        """test missing redirect URI"""
117        OAuth2Provider.objects.create(
118            name=generate_id(),
119            client_id="test",
120            authorization_flow=create_test_flow(),
121            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
122        )
123        with self.assertRaises(RedirectUriError) as cm:
124            request = self.factory.get("/", data={"response_type": "code", "client_id": "test"})
125            OAuthAuthorizationParams.from_request(request)
126        self.assertEqual(cm.exception.cause, "redirect_uri_missing")
127
128    def test_invalid_redirect_uri(self):
129        """test invalid redirect URI"""
130        OAuth2Provider.objects.create(
131            name=generate_id(),
132            client_id="test",
133            authorization_flow=create_test_flow(),
134            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
135        )
136        with self.assertRaises(RedirectUriError) as cm:
137            request = self.factory.get(
138                "/",
139                data={
140                    "response_type": "code",
141                    "client_id": "test",
142                    "redirect_uri": "http://localhost",
143                },
144            )
145            OAuthAuthorizationParams.from_request(request)
146        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
147
148    def test_blocked_redirect_uri(self):
149        """test missing/invalid redirect URI"""
150        OAuth2Provider.objects.create(
151            name=generate_id(),
152            client_id="test",
153            authorization_flow=create_test_flow(),
154            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")],
155        )
156        with self.assertRaises(RedirectUriError) as cm:
157            request = self.factory.get(
158                "/",
159                data={
160                    "response_type": "code",
161                    "client_id": "test",
162                    "redirect_uri": "data:localhost",
163                },
164            )
165            OAuthAuthorizationParams.from_request(request)
166        self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme")
167
168    def test_invalid_redirect_uri_regex(self):
169        """test missing/invalid redirect URI"""
170        OAuth2Provider.objects.create(
171            name=generate_id(),
172            client_id="test",
173            authorization_flow=create_test_flow(),
174            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")],
175        )
176        with self.assertRaises(RedirectUriError) as cm:
177            request = self.factory.get(
178                "/",
179                data={
180                    "response_type": "code",
181                    "client_id": "test",
182                    "redirect_uri": "http://localhost",
183                },
184            )
185            OAuthAuthorizationParams.from_request(request)
186        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
187
188    def test_redirect_uri_invalid_regex(self):
189        """test missing/invalid redirect URI (invalid regex)"""
190        OAuth2Provider.objects.create(
191            name=generate_id(),
192            client_id="test",
193            authorization_flow=create_test_flow(),
194            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")],
195        )
196        with self.assertRaises(RedirectUriError) as cm:
197            request = self.factory.get(
198                "/",
199                data={
200                    "response_type": "code",
201                    "client_id": "test",
202                    "redirect_uri": "http://localhost",
203                },
204            )
205            OAuthAuthorizationParams.from_request(request)
206        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")
207
208    def test_redirect_uri_regex(self):
209        """test valid redirect URI (regex)"""
210        OAuth2Provider.objects.create(
211            name=generate_id(),
212            client_id="test",
213            authorization_flow=create_test_flow(),
214            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")],
215            grant_types=[GrantType.AUTHORIZATION_CODE],
216        )
217        request = self.factory.get(
218            "/",
219            data={
220                "response_type": "code",
221                "client_id": "test",
222                "redirect_uri": "http://foo.bar.baz",
223            },
224        )
225        OAuthAuthorizationParams.from_request(request)
226
227    @apply_blueprint("system/providers-oauth2.yaml")
228    def test_response_type(self):
229        """test response_type"""
230        provider = OAuth2Provider.objects.create(
231            name=generate_id(),
232            client_id="test",
233            authorization_flow=create_test_flow(),
234            grant_types=[GrantType.AUTHORIZATION_CODE],
235            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
236        )
237        provider.property_mappings.set(
238            ScopeMapping.objects.filter(
239                managed__in=[
240                    "goauthentik.io/providers/oauth2/scope-openid",
241                    "goauthentik.io/providers/oauth2/scope-email",
242                    "goauthentik.io/providers/oauth2/scope-profile",
243                ]
244            )
245        )
246        request = self.factory.get(
247            "/",
248            data={
249                "response_type": "code",
250                "client_id": "test",
251                "redirect_uri": "http://local.invalid/Foo",
252            },
253        )
254        self.assertEqual(
255            OAuthAuthorizationParams.from_request(request).grant_type,
256            GrantType.AUTHORIZATION_CODE,
257        )
258        self.assertEqual(
259            OAuthAuthorizationParams.from_request(request).redirect_uri,
260            "http://local.invalid/Foo",
261        )
262        provider.grant_types = [GrantType.IMPLICIT]
263        provider.save()
264        request = self.factory.get(
265            "/",
266            data={
267                "response_type": "id_token",
268                "client_id": "test",
269                "redirect_uri": "http://local.invalid/Foo",
270                "scope": "openid",
271                "state": "foo",
272                "nonce": generate_id(),
273            },
274        )
275        self.assertEqual(
276            OAuthAuthorizationParams.from_request(request).grant_type,
277            GrantType.IMPLICIT,
278        )
279        # Implicit without openid scope
280        with self.assertRaises(AuthorizeError) as cm:
281            request = self.factory.get(
282                "/",
283                data={
284                    "response_type": "id_token",
285                    "client_id": "test",
286                    "redirect_uri": "http://local.invalid/Foo",
287                    "state": "foo",
288                },
289            )
290            self.assertEqual(
291                OAuthAuthorizationParams.from_request(request).grant_type,
292                GrantType.IMPLICIT,
293            )
294        provider.grant_types = [GrantType.HYBRID]
295        provider.save()
296        request = self.factory.get(
297            "/",
298            data={
299                "response_type": "code token",
300                "client_id": "test",
301                "redirect_uri": "http://local.invalid/Foo",
302                "scope": "openid",
303                "state": "foo",
304            },
305        )
306        self.assertEqual(
307            OAuthAuthorizationParams.from_request(request).grant_type, GrantType.HYBRID
308        )
309        with self.assertRaises(AuthorizeError) as cm:
310            request = self.factory.get(
311                "/",
312                data={
313                    "response_type": "invalid",
314                    "client_id": "test",
315                    "redirect_uri": "http://local.invalid/Foo",
316                },
317            )
318            OAuthAuthorizationParams.from_request(request)
319        self.assertEqual(cm.exception.error, "unsupported_response_type")
320
321    def test_full_code(self):
322        """Test full authorization"""
323        flow = create_test_flow()
324        provider = OAuth2Provider.objects.create(
325            name=generate_id(),
326            client_id="test",
327            authorization_flow=flow,
328            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
329            access_code_validity="seconds=100",
330            grant_types=[GrantType.AUTHORIZATION_CODE],
331        )
332        Application.objects.create(name="app", slug="app", provider=provider)
333        state = generate_id()
334        user = create_test_admin_user()
335        self.client.force_login(user)
336        # Step 1, initiate params and get redirect to flow
337        response = self.client.get(
338            reverse("authentik_providers_oauth2:authorize"),
339            data={
340                "response_type": "code",
341                "client_id": "test",
342                "state": state,
343                "redirect_uri": "foo://localhost",
344            },
345        )
346        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
347        self.assertEqual(
348            response.url,
349            f"foo://localhost?code={code.code}&state={state}",
350        )
351        self.assertAlmostEqual(
352            code.expires.timestamp() - now().timestamp(),
353            timedelta_from_string(provider.access_code_validity).total_seconds(),
354            delta=5,
355        )
356
357    @apply_blueprint("system/providers-oauth2.yaml")
358    def test_full_implicit(self):
359        """Test full authorization"""
360        flow = create_test_flow()
361        provider: OAuth2Provider = OAuth2Provider.objects.create(
362            name=generate_id(),
363            client_id="test",
364            authorization_flow=flow,
365            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
366            signing_key=self.keypair,
367            grant_types=[GrantType.IMPLICIT],
368        )
369        provider.property_mappings.set(
370            ScopeMapping.objects.filter(
371                managed__in=[
372                    "goauthentik.io/providers/oauth2/scope-openid",
373                    "goauthentik.io/providers/oauth2/scope-email",
374                    "goauthentik.io/providers/oauth2/scope-profile",
375                ]
376            )
377        )
378        provider.property_mappings.add(
379            ScopeMapping.objects.create(
380                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
381            )
382        )
383        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
384        state = generate_id()
385        user = create_test_admin_user()
386        self.client.force_login(user)
387        with patch(
388            "authentik.providers.oauth2.id_token.get_login_event",
389            MagicMock(
390                return_value=Event(
391                    action=EventAction.LOGIN,
392                    context={PLAN_CONTEXT_METHOD: "password"},
393                    created=now(),
394                )
395            ),
396        ):
397            # Step 1, initiate params and get redirect to flow
398            response = self.client.get(
399                reverse("authentik_providers_oauth2:authorize"),
400                data={
401                    "response_type": "id_token",
402                    "client_id": "test",
403                    "state": state,
404                    "scope": "openid test",
405                    "redirect_uri": "http://localhost",
406                    "nonce": generate_id(),
407                },
408            )
409            token = AccessToken.objects.filter(user=user).first()
410            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
411            self.assertEqual(
412                response.url,
413                (
414                    f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}"
415                    f"&token_type={TOKEN_TYPE}"
416                    f"&expires_in={int(expires)}&state={state}"
417                ),
418            )
419            jwt = self.validate_jwt(token, provider)
420            self.assertEqual(jwt["amr"], ["pwd"])
421            self.assertEqual(jwt["sub"], "foo")
422            self.assertAlmostEqual(
423                jwt["exp"] - now().timestamp(),
424                expires,
425                delta=5,
426            )
427
428    @apply_blueprint("system/providers-oauth2.yaml")
429    def test_full_implicit_enc(self):
430        """Test full authorization with encryption"""
431        flow = create_test_flow()
432        provider: OAuth2Provider = OAuth2Provider.objects.create(
433            name=generate_id(),
434            client_id="test",
435            authorization_flow=flow,
436            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
437            signing_key=self.keypair,
438            encryption_key=self.keypair,
439            grant_types=[GrantType.IMPLICIT],
440        )
441        provider.property_mappings.set(
442            ScopeMapping.objects.filter(
443                managed__in=[
444                    "goauthentik.io/providers/oauth2/scope-openid",
445                    "goauthentik.io/providers/oauth2/scope-email",
446                    "goauthentik.io/providers/oauth2/scope-profile",
447                ]
448            )
449        )
450        provider.property_mappings.add(
451            ScopeMapping.objects.create(
452                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
453            )
454        )
455        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
456        state = generate_id()
457        user = create_test_admin_user()
458        self.client.force_login(user)
459        with patch(
460            "authentik.providers.oauth2.id_token.get_login_event",
461            MagicMock(
462                return_value=Event(
463                    action=EventAction.LOGIN,
464                    context={PLAN_CONTEXT_METHOD: "password"},
465                    created=now(),
466                )
467            ),
468        ):
469            # Step 1, initiate params and get redirect to flow
470            response = self.client.get(
471                reverse("authentik_providers_oauth2:authorize"),
472                data={
473                    "response_type": "id_token",
474                    "client_id": "test",
475                    "state": state,
476                    "scope": "openid test",
477                    "redirect_uri": "http://localhost",
478                    "nonce": generate_id(),
479                },
480            )
481            self.assertEqual(response.status_code, 302)
482            token = AccessToken.objects.filter(user=user).first()
483            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
484            jwt = self.validate_jwe(token, provider)
485            self.assertEqual(jwt["amr"], ["pwd"])
486            self.assertEqual(jwt["sub"], "foo")
487            self.assertAlmostEqual(
488                jwt["exp"] - now().timestamp(),
489                expires,
490                delta=5,
491            )
492
493    def test_full_fragment_code(self):
494        """Test full authorization"""
495        flow = create_test_flow()
496        provider: OAuth2Provider = OAuth2Provider.objects.create(
497            name=generate_id(),
498            client_id="test",
499            authorization_flow=flow,
500            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
501            signing_key=self.keypair,
502            grant_types=[GrantType.AUTHORIZATION_CODE],
503        )
504        Application.objects.create(name="app", slug="app", provider=provider)
505        state = generate_id()
506        user = create_test_admin_user()
507        self.client.force_login(user)
508        with patch(
509            "authentik.providers.oauth2.id_token.get_login_event",
510            MagicMock(
511                return_value=Event(
512                    action=EventAction.LOGIN,
513                    context={PLAN_CONTEXT_METHOD: "password"},
514                    created=now(),
515                )
516            ),
517        ):
518            # Step 1, initiate params and get redirect to flow
519            response = self.client.get(
520                reverse("authentik_providers_oauth2:authorize"),
521                data={
522                    "response_type": "code",
523                    "response_mode": "fragment",
524                    "client_id": "test",
525                    "state": state,
526                    "scope": "openid",
527                    "redirect_uri": "http://localhost",
528                    "nonce": generate_id(),
529                },
530            )
531            code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
532            self.assertEqual(
533                response.url,
534                f"http://localhost#code={code.code}&state={state}",
535            )
536            self.assertAlmostEqual(
537                code.expires.timestamp() - now().timestamp(),
538                timedelta_from_string(provider.access_code_validity).total_seconds(),
539                delta=5,
540            )
541
542    @apply_blueprint("system/providers-oauth2.yaml")
543    def test_full_form_post_id_token(self):
544        """Test full authorization (form_post response)"""
545        flow = create_test_flow()
546        provider = OAuth2Provider.objects.create(
547            name=generate_id(),
548            client_id=generate_id(),
549            authorization_flow=flow,
550            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
551            signing_key=self.keypair,
552            grant_types=[GrantType.IMPLICIT],
553        )
554        provider.property_mappings.set(
555            ScopeMapping.objects.filter(
556                managed__in=[
557                    "goauthentik.io/providers/oauth2/scope-openid",
558                    "goauthentik.io/providers/oauth2/scope-email",
559                    "goauthentik.io/providers/oauth2/scope-profile",
560                ]
561            )
562        )
563        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
564        state = generate_id()
565        user = create_test_admin_user()
566        self.client.force_login(user)
567        # Step 1, initiate params and get redirect to flow
568        self.client.get(
569            reverse("authentik_providers_oauth2:authorize"),
570            data={
571                "response_type": "id_token",
572                "response_mode": "form_post",
573                "client_id": provider.client_id,
574                "state": state,
575                "scope": "openid",
576                "redirect_uri": "http://localhost",
577                "nonce": generate_id(),
578            },
579        )
580        response = self.client.get(
581            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
582        )
583        token = AccessToken.objects.filter(user=user).first()
584        self.assertIsNotNone(token)
585        self.assertJSONEqual(
586            response.content.decode(),
587            {
588                "component": "ak-stage-autosubmit",
589                "url": "http://localhost",
590                "title": f"Redirecting to {app.name}...",
591                "attrs": {
592                    "id_token": provider.encode(token.id_token.to_dict()),
593                    "token_type": TOKEN_TYPE,
594                    "expires_in": "3600",
595                    "state": state,
596                },
597            },
598        )
599        self.validate_jwt(token, provider)
600
601    def test_full_form_post_code(self):
602        """Test full authorization (form_post response, code type)"""
603        flow = create_test_flow()
604        provider = OAuth2Provider.objects.create(
605            name=generate_id(),
606            client_id=generate_id(),
607            authorization_flow=flow,
608            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
609            signing_key=self.keypair,
610            grant_types=[GrantType.AUTHORIZATION_CODE],
611        )
612        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
613        state = generate_id()
614        user = create_test_admin_user()
615        self.client.force_login(user)
616        # Step 1, initiate params and get redirect to flow
617        self.client.get(
618            reverse("authentik_providers_oauth2:authorize"),
619            data={
620                "response_type": "code",
621                "response_mode": "form_post",
622                "client_id": provider.client_id,
623                "state": state,
624                "scope": "openid",
625                "redirect_uri": "http://localhost",
626            },
627        )
628        response = self.client.get(
629            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
630        )
631        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
632        self.assertJSONEqual(
633            response.content.decode(),
634            {
635                "component": "ak-stage-autosubmit",
636                "url": "http://localhost",
637                "title": f"Redirecting to {app.name}...",
638                "attrs": {
639                    "code": code.code,
640                    "state": state,
641                },
642            },
643        )
644
645    def test_openid_missing_invalid(self):
646        """test request requiring an OpenID scope to be set"""
647        OAuth2Provider.objects.create(
648            name=generate_id(),
649            client_id="test",
650            authorization_flow=create_test_flow(),
651            grant_types=[GrantType.IMPLICIT],
652            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
653        )
654        request = self.factory.get(
655            "/",
656            data={
657                "response_type": "id_token",
658                "client_id": "test",
659                "redirect_uri": "http://localhost",
660                "scope": "",
661            },
662        )
663        with self.assertRaises(AuthorizeError) as cm:
664            OAuthAuthorizationParams.from_request(request)
665        self.assertEqual(cm.exception.cause, "scope_openid_missing")
666
667    @apply_blueprint("system/providers-oauth2.yaml")
668    def test_offline_access_invalid(self):
669        """test request for offline_access with invalid response type"""
670        provider = OAuth2Provider.objects.create(
671            name=generate_id(),
672            client_id="test",
673            authorization_flow=create_test_flow(),
674            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
675            grant_types=[GrantType.IMPLICIT],
676        )
677        provider.property_mappings.set(
678            ScopeMapping.objects.filter(
679                managed__in=[
680                    "goauthentik.io/providers/oauth2/scope-openid",
681                    "goauthentik.io/providers/oauth2/scope-offline_access",
682                ]
683            )
684        )
685        request = self.factory.get(
686            "/",
687            data={
688                "response_type": "id_token",
689                "client_id": "test",
690                "redirect_uri": "http://localhost",
691                "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}",
692                "nonce": generate_id(),
693            },
694        )
695        parsed = OAuthAuthorizationParams.from_request(request)
696        self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope)
697
698    @apply_blueprint("default/flow-default-authentication-flow.yaml")
699    def test_ui_locales(self):
700        """Test OIDC ui_locales authorization"""
701        flow = create_test_flow()
702        provider = OAuth2Provider.objects.create(
703            name=generate_id(),
704            client_id="test",
705            authorization_flow=flow,
706            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
707            access_code_validity="seconds=100",
708            grant_types=[GrantType.AUTHORIZATION_CODE],
709        )
710        Application.objects.create(name="app", slug="app", provider=provider)
711        state = generate_id()
712        self.client.logout()
713        try:
714            response = self.client.get(
715                reverse("authentik_providers_oauth2:authorize"),
716                data={
717                    "response_type": "code",
718                    "client_id": "test",
719                    "state": state,
720                    "redirect_uri": "foo://localhost",
721                    "ui_locales": "invalid fr",
722                },
723            )
724            parsed = parse_qs(urlparse(response.url).query)
725            self.assertEqual(parsed["locale"], ["fr"])
726        finally:
727            translation.deactivate()
728
729    @apply_blueprint("default/flow-default-authentication-flow.yaml")
730    def test_ui_locales_invalid(self):
731        """Test OIDC ui_locales authorization"""
732        flow = create_test_flow()
733        provider = OAuth2Provider.objects.create(
734            name=generate_id(),
735            client_id="test",
736            authorization_flow=flow,
737            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
738            access_code_validity="seconds=100",
739            grant_types=[GrantType.AUTHORIZATION_CODE],
740        )
741        Application.objects.create(name="app", slug="app", provider=provider)
742        state = generate_id()
743        self.client.logout()
744        response = self.client.get(
745            reverse("authentik_providers_oauth2:authorize"),
746            data={
747                "response_type": "code",
748                "client_id": "test",
749                "state": state,
750                "redirect_uri": "foo://localhost",
751                "ui_locales": "invalid",
752            },
753        )
754        parsed = parse_qs(urlparse(response.url).query)
755        self.assertNotIn("locale", parsed)
756
757    def test_authentication_flow(self):
758        """Test custom authentication flow"""
759        brand = create_test_brand()
760        global_auth = create_test_flow()
761        FlowStageBinding.objects.create(
762            target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10
763        )
764        brand.flow_authentication = global_auth
765        brand.save()
766
767        flow = create_test_flow()
768        auth_flow = create_test_flow()
769        FlowStageBinding.objects.create(
770            target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10
771        )
772        provider = OAuth2Provider.objects.create(
773            name=generate_id(),
774            client_id="test",
775            authorization_flow=flow,
776            authentication_flow=auth_flow,
777            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
778            access_code_validity="seconds=100",
779            grant_types=[GrantType.AUTHORIZATION_CODE],
780        )
781        Application.objects.create(name="app", slug="app", provider=provider)
782        state = generate_id()
783        response = self.client.get(
784            reverse("authentik_providers_oauth2:authorize"),
785            data={
786                "response_type": "code",
787                "client_id": "test",
788                "state": state,
789                "redirect_uri": "foo://localhost",
790            },
791        )
792        self.assertEqual(response.status_code, 302)
793        self.assertIn(auth_flow.slug, response.url)
794        self.assertNotIn(global_auth.slug, response.url)
795
796    @apply_blueprint("default/flow-default-authentication-flow.yaml")
797    def test_login_hint(self):
798        """Login hint"""
799        flow = create_test_flow()
800        provider = OAuth2Provider.objects.create(
801            name=generate_id(),
802            client_id="test",
803            authorization_flow=flow,
804            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
805            access_code_validity="seconds=100",
806            grant_types=[GrantType.AUTHORIZATION_CODE],
807        )
808        Application.objects.create(name="app", slug="app", provider=provider)
809        state = generate_id()
810        response = self.client.get(
811            reverse("authentik_providers_oauth2:authorize"),
812            data={
813                "response_type": "code",
814                "client_id": "test",
815                "state": state,
816                "redirect_uri": "foo://localhost",
817                "login_hint": "foo",
818            },
819        )
820        self.assertEqual(response.status_code, 302)
821        plan = self.client.session.get(SESSION_KEY_PLAN)
822        self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")

Test authorize view

def setUp(self) -> None:
41    def setUp(self) -> None:
42        super().setUp()
43        self.factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

def test_disallowed_grant_type(self):
45    def test_disallowed_grant_type(self):
46        """Test with disallowed grant type"""
47        OAuth2Provider.objects.create(
48            name=generate_id(),
49            client_id="test",
50            grant_types=[],
51            authorization_flow=create_test_flow(),
52            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
53        )
54        with self.assertRaises(AuthorizeError) as cm:
55            request = self.factory.get(
56                "/",
57                data={
58                    "response_type": "code",
59                    "client_id": "test",
60                    "redirect_uri": "http://local.invalid/Foo",
61                },
62            )
63            OAuthAuthorizationParams.from_request(request)
64        self.assertEqual(cm.exception.error, "invalid_request")

Test with disallowed grant type

def test_invalid_grant_type(self):
66    def test_invalid_grant_type(self):
67        """Test with invalid grant type"""
68        OAuth2Provider.objects.create(
69            name=generate_id(),
70            client_id="test",
71            authorization_flow=create_test_flow(),
72            grant_types=[GrantType.AUTHORIZATION_CODE],
73            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
74        )
75        with self.assertRaises(AuthorizeError) as cm:
76            request = self.factory.get(
77                "/",
78                data={
79                    "response_type": "invalid",
80                    "client_id": "test",
81                    "redirect_uri": "http://local.invalid/Foo",
82                },
83            )
84            OAuthAuthorizationParams.from_request(request)
85        self.assertEqual(cm.exception.error, "unsupported_response_type")

Test with invalid grant type

def test_invalid_client_id(self):
87    def test_invalid_client_id(self):
88        """Test invalid client ID"""
89        with self.assertRaises(ClientIdError):
90            request = self.factory.get("/", data={"response_type": "code", "client_id": "invalid"})
91            OAuthAuthorizationParams.from_request(request)

Test invalid client ID

def test_request(self):
 93    def test_request(self):
 94        """test request param"""
 95        OAuth2Provider.objects.create(
 96            name=generate_id(),
 97            client_id="test",
 98            authorization_flow=create_test_flow(),
 99            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
100            grant_types=[GrantType.AUTHORIZATION_CODE],
101        )
102        with self.assertRaises(AuthorizeError) as cm:
103            request = self.factory.get(
104                "/",
105                data={
106                    "response_type": "code",
107                    "client_id": "test",
108                    "redirect_uri": "http://local.invalid/Foo",
109                    "request": "foo",
110                },
111            )
112            OAuthAuthorizationParams.from_request(request)
113        self.assertEqual(cm.exception.error, "request_not_supported")

test request param

def test_invalid_redirect_uri_missing(self):
115    def test_invalid_redirect_uri_missing(self):
116        """test missing redirect URI"""
117        OAuth2Provider.objects.create(
118            name=generate_id(),
119            client_id="test",
120            authorization_flow=create_test_flow(),
121            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
122        )
123        with self.assertRaises(RedirectUriError) as cm:
124            request = self.factory.get("/", data={"response_type": "code", "client_id": "test"})
125            OAuthAuthorizationParams.from_request(request)
126        self.assertEqual(cm.exception.cause, "redirect_uri_missing")

test missing redirect URI

def test_invalid_redirect_uri(self):
128    def test_invalid_redirect_uri(self):
129        """test invalid redirect URI"""
130        OAuth2Provider.objects.create(
131            name=generate_id(),
132            client_id="test",
133            authorization_flow=create_test_flow(),
134            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid")],
135        )
136        with self.assertRaises(RedirectUriError) as cm:
137            request = self.factory.get(
138                "/",
139                data={
140                    "response_type": "code",
141                    "client_id": "test",
142                    "redirect_uri": "http://localhost",
143                },
144            )
145            OAuthAuthorizationParams.from_request(request)
146        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")

test invalid redirect URI

def test_blocked_redirect_uri(self):
148    def test_blocked_redirect_uri(self):
149        """test missing/invalid redirect URI"""
150        OAuth2Provider.objects.create(
151            name=generate_id(),
152            client_id="test",
153            authorization_flow=create_test_flow(),
154            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "data:localhost")],
155        )
156        with self.assertRaises(RedirectUriError) as cm:
157            request = self.factory.get(
158                "/",
159                data={
160                    "response_type": "code",
161                    "client_id": "test",
162                    "redirect_uri": "data:localhost",
163                },
164            )
165            OAuthAuthorizationParams.from_request(request)
166        self.assertEqual(cm.exception.cause, "redirect_uri_forbidden_scheme")

test missing/invalid redirect URI

def test_invalid_redirect_uri_regex(self):
168    def test_invalid_redirect_uri_regex(self):
169        """test missing/invalid redirect URI"""
170        OAuth2Provider.objects.create(
171            name=generate_id(),
172            client_id="test",
173            authorization_flow=create_test_flow(),
174            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "http://local.invalid?")],
175        )
176        with self.assertRaises(RedirectUriError) as cm:
177            request = self.factory.get(
178                "/",
179                data={
180                    "response_type": "code",
181                    "client_id": "test",
182                    "redirect_uri": "http://localhost",
183                },
184            )
185            OAuthAuthorizationParams.from_request(request)
186        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")

test missing/invalid redirect URI

def test_redirect_uri_invalid_regex(self):
188    def test_redirect_uri_invalid_regex(self):
189        """test missing/invalid redirect URI (invalid regex)"""
190        OAuth2Provider.objects.create(
191            name=generate_id(),
192            client_id="test",
193            authorization_flow=create_test_flow(),
194            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, "+")],
195        )
196        with self.assertRaises(RedirectUriError) as cm:
197            request = self.factory.get(
198                "/",
199                data={
200                    "response_type": "code",
201                    "client_id": "test",
202                    "redirect_uri": "http://localhost",
203                },
204            )
205            OAuthAuthorizationParams.from_request(request)
206        self.assertEqual(cm.exception.cause, "redirect_uri_no_match")

test missing/invalid redirect URI (invalid regex)

def test_redirect_uri_regex(self):
208    def test_redirect_uri_regex(self):
209        """test valid redirect URI (regex)"""
210        OAuth2Provider.objects.create(
211            name=generate_id(),
212            client_id="test",
213            authorization_flow=create_test_flow(),
214            redirect_uris=[RedirectURI(RedirectURIMatchingMode.REGEX, ".+")],
215            grant_types=[GrantType.AUTHORIZATION_CODE],
216        )
217        request = self.factory.get(
218            "/",
219            data={
220                "response_type": "code",
221                "client_id": "test",
222                "redirect_uri": "http://foo.bar.baz",
223            },
224        )
225        OAuthAuthorizationParams.from_request(request)

test valid redirect URI (regex)

@apply_blueprint('system/providers-oauth2.yaml')
def test_response_type(self):
227    @apply_blueprint("system/providers-oauth2.yaml")
228    def test_response_type(self):
229        """test response_type"""
230        provider = OAuth2Provider.objects.create(
231            name=generate_id(),
232            client_id="test",
233            authorization_flow=create_test_flow(),
234            grant_types=[GrantType.AUTHORIZATION_CODE],
235            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://local.invalid/Foo")],
236        )
237        provider.property_mappings.set(
238            ScopeMapping.objects.filter(
239                managed__in=[
240                    "goauthentik.io/providers/oauth2/scope-openid",
241                    "goauthentik.io/providers/oauth2/scope-email",
242                    "goauthentik.io/providers/oauth2/scope-profile",
243                ]
244            )
245        )
246        request = self.factory.get(
247            "/",
248            data={
249                "response_type": "code",
250                "client_id": "test",
251                "redirect_uri": "http://local.invalid/Foo",
252            },
253        )
254        self.assertEqual(
255            OAuthAuthorizationParams.from_request(request).grant_type,
256            GrantType.AUTHORIZATION_CODE,
257        )
258        self.assertEqual(
259            OAuthAuthorizationParams.from_request(request).redirect_uri,
260            "http://local.invalid/Foo",
261        )
262        provider.grant_types = [GrantType.IMPLICIT]
263        provider.save()
264        request = self.factory.get(
265            "/",
266            data={
267                "response_type": "id_token",
268                "client_id": "test",
269                "redirect_uri": "http://local.invalid/Foo",
270                "scope": "openid",
271                "state": "foo",
272                "nonce": generate_id(),
273            },
274        )
275        self.assertEqual(
276            OAuthAuthorizationParams.from_request(request).grant_type,
277            GrantType.IMPLICIT,
278        )
279        # Implicit without openid scope
280        with self.assertRaises(AuthorizeError) as cm:
281            request = self.factory.get(
282                "/",
283                data={
284                    "response_type": "id_token",
285                    "client_id": "test",
286                    "redirect_uri": "http://local.invalid/Foo",
287                    "state": "foo",
288                },
289            )
290            self.assertEqual(
291                OAuthAuthorizationParams.from_request(request).grant_type,
292                GrantType.IMPLICIT,
293            )
294        provider.grant_types = [GrantType.HYBRID]
295        provider.save()
296        request = self.factory.get(
297            "/",
298            data={
299                "response_type": "code token",
300                "client_id": "test",
301                "redirect_uri": "http://local.invalid/Foo",
302                "scope": "openid",
303                "state": "foo",
304            },
305        )
306        self.assertEqual(
307            OAuthAuthorizationParams.from_request(request).grant_type, GrantType.HYBRID
308        )
309        with self.assertRaises(AuthorizeError) as cm:
310            request = self.factory.get(
311                "/",
312                data={
313                    "response_type": "invalid",
314                    "client_id": "test",
315                    "redirect_uri": "http://local.invalid/Foo",
316                },
317            )
318            OAuthAuthorizationParams.from_request(request)
319        self.assertEqual(cm.exception.error, "unsupported_response_type")

test response_type

def test_full_code(self):
321    def test_full_code(self):
322        """Test full authorization"""
323        flow = create_test_flow()
324        provider = OAuth2Provider.objects.create(
325            name=generate_id(),
326            client_id="test",
327            authorization_flow=flow,
328            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
329            access_code_validity="seconds=100",
330            grant_types=[GrantType.AUTHORIZATION_CODE],
331        )
332        Application.objects.create(name="app", slug="app", provider=provider)
333        state = generate_id()
334        user = create_test_admin_user()
335        self.client.force_login(user)
336        # Step 1, initiate params and get redirect to flow
337        response = self.client.get(
338            reverse("authentik_providers_oauth2:authorize"),
339            data={
340                "response_type": "code",
341                "client_id": "test",
342                "state": state,
343                "redirect_uri": "foo://localhost",
344            },
345        )
346        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
347        self.assertEqual(
348            response.url,
349            f"foo://localhost?code={code.code}&state={state}",
350        )
351        self.assertAlmostEqual(
352            code.expires.timestamp() - now().timestamp(),
353            timedelta_from_string(provider.access_code_validity).total_seconds(),
354            delta=5,
355        )

Test full authorization

@apply_blueprint('system/providers-oauth2.yaml')
def test_full_implicit(self):
357    @apply_blueprint("system/providers-oauth2.yaml")
358    def test_full_implicit(self):
359        """Test full authorization"""
360        flow = create_test_flow()
361        provider: OAuth2Provider = OAuth2Provider.objects.create(
362            name=generate_id(),
363            client_id="test",
364            authorization_flow=flow,
365            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
366            signing_key=self.keypair,
367            grant_types=[GrantType.IMPLICIT],
368        )
369        provider.property_mappings.set(
370            ScopeMapping.objects.filter(
371                managed__in=[
372                    "goauthentik.io/providers/oauth2/scope-openid",
373                    "goauthentik.io/providers/oauth2/scope-email",
374                    "goauthentik.io/providers/oauth2/scope-profile",
375                ]
376            )
377        )
378        provider.property_mappings.add(
379            ScopeMapping.objects.create(
380                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
381            )
382        )
383        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
384        state = generate_id()
385        user = create_test_admin_user()
386        self.client.force_login(user)
387        with patch(
388            "authentik.providers.oauth2.id_token.get_login_event",
389            MagicMock(
390                return_value=Event(
391                    action=EventAction.LOGIN,
392                    context={PLAN_CONTEXT_METHOD: "password"},
393                    created=now(),
394                )
395            ),
396        ):
397            # Step 1, initiate params and get redirect to flow
398            response = self.client.get(
399                reverse("authentik_providers_oauth2:authorize"),
400                data={
401                    "response_type": "id_token",
402                    "client_id": "test",
403                    "state": state,
404                    "scope": "openid test",
405                    "redirect_uri": "http://localhost",
406                    "nonce": generate_id(),
407                },
408            )
409            token = AccessToken.objects.filter(user=user).first()
410            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
411            self.assertEqual(
412                response.url,
413                (
414                    f"http://localhost#id_token={provider.encode(token.id_token.to_dict())}"
415                    f"&token_type={TOKEN_TYPE}"
416                    f"&expires_in={int(expires)}&state={state}"
417                ),
418            )
419            jwt = self.validate_jwt(token, provider)
420            self.assertEqual(jwt["amr"], ["pwd"])
421            self.assertEqual(jwt["sub"], "foo")
422            self.assertAlmostEqual(
423                jwt["exp"] - now().timestamp(),
424                expires,
425                delta=5,
426            )

Test full authorization

@apply_blueprint('system/providers-oauth2.yaml')
def test_full_implicit_enc(self):
428    @apply_blueprint("system/providers-oauth2.yaml")
429    def test_full_implicit_enc(self):
430        """Test full authorization with encryption"""
431        flow = create_test_flow()
432        provider: OAuth2Provider = OAuth2Provider.objects.create(
433            name=generate_id(),
434            client_id="test",
435            authorization_flow=flow,
436            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
437            signing_key=self.keypair,
438            encryption_key=self.keypair,
439            grant_types=[GrantType.IMPLICIT],
440        )
441        provider.property_mappings.set(
442            ScopeMapping.objects.filter(
443                managed__in=[
444                    "goauthentik.io/providers/oauth2/scope-openid",
445                    "goauthentik.io/providers/oauth2/scope-email",
446                    "goauthentik.io/providers/oauth2/scope-profile",
447                ]
448            )
449        )
450        provider.property_mappings.add(
451            ScopeMapping.objects.create(
452                name=generate_id(), scope_name="test", expression="""return {"sub": "foo"}"""
453            )
454        )
455        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
456        state = generate_id()
457        user = create_test_admin_user()
458        self.client.force_login(user)
459        with patch(
460            "authentik.providers.oauth2.id_token.get_login_event",
461            MagicMock(
462                return_value=Event(
463                    action=EventAction.LOGIN,
464                    context={PLAN_CONTEXT_METHOD: "password"},
465                    created=now(),
466                )
467            ),
468        ):
469            # Step 1, initiate params and get redirect to flow
470            response = self.client.get(
471                reverse("authentik_providers_oauth2:authorize"),
472                data={
473                    "response_type": "id_token",
474                    "client_id": "test",
475                    "state": state,
476                    "scope": "openid test",
477                    "redirect_uri": "http://localhost",
478                    "nonce": generate_id(),
479                },
480            )
481            self.assertEqual(response.status_code, 302)
482            token = AccessToken.objects.filter(user=user).first()
483            expires = timedelta_from_string(provider.access_token_validity).total_seconds()
484            jwt = self.validate_jwe(token, provider)
485            self.assertEqual(jwt["amr"], ["pwd"])
486            self.assertEqual(jwt["sub"], "foo")
487            self.assertAlmostEqual(
488                jwt["exp"] - now().timestamp(),
489                expires,
490                delta=5,
491            )

Test full authorization with encryption

def test_full_fragment_code(self):
493    def test_full_fragment_code(self):
494        """Test full authorization"""
495        flow = create_test_flow()
496        provider: OAuth2Provider = OAuth2Provider.objects.create(
497            name=generate_id(),
498            client_id="test",
499            authorization_flow=flow,
500            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
501            signing_key=self.keypair,
502            grant_types=[GrantType.AUTHORIZATION_CODE],
503        )
504        Application.objects.create(name="app", slug="app", provider=provider)
505        state = generate_id()
506        user = create_test_admin_user()
507        self.client.force_login(user)
508        with patch(
509            "authentik.providers.oauth2.id_token.get_login_event",
510            MagicMock(
511                return_value=Event(
512                    action=EventAction.LOGIN,
513                    context={PLAN_CONTEXT_METHOD: "password"},
514                    created=now(),
515                )
516            ),
517        ):
518            # Step 1, initiate params and get redirect to flow
519            response = self.client.get(
520                reverse("authentik_providers_oauth2:authorize"),
521                data={
522                    "response_type": "code",
523                    "response_mode": "fragment",
524                    "client_id": "test",
525                    "state": state,
526                    "scope": "openid",
527                    "redirect_uri": "http://localhost",
528                    "nonce": generate_id(),
529                },
530            )
531            code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
532            self.assertEqual(
533                response.url,
534                f"http://localhost#code={code.code}&state={state}",
535            )
536            self.assertAlmostEqual(
537                code.expires.timestamp() - now().timestamp(),
538                timedelta_from_string(provider.access_code_validity).total_seconds(),
539                delta=5,
540            )

Test full authorization

@apply_blueprint('system/providers-oauth2.yaml')
def test_full_form_post_id_token(self):
542    @apply_blueprint("system/providers-oauth2.yaml")
543    def test_full_form_post_id_token(self):
544        """Test full authorization (form_post response)"""
545        flow = create_test_flow()
546        provider = OAuth2Provider.objects.create(
547            name=generate_id(),
548            client_id=generate_id(),
549            authorization_flow=flow,
550            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
551            signing_key=self.keypair,
552            grant_types=[GrantType.IMPLICIT],
553        )
554        provider.property_mappings.set(
555            ScopeMapping.objects.filter(
556                managed__in=[
557                    "goauthentik.io/providers/oauth2/scope-openid",
558                    "goauthentik.io/providers/oauth2/scope-email",
559                    "goauthentik.io/providers/oauth2/scope-profile",
560                ]
561            )
562        )
563        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
564        state = generate_id()
565        user = create_test_admin_user()
566        self.client.force_login(user)
567        # Step 1, initiate params and get redirect to flow
568        self.client.get(
569            reverse("authentik_providers_oauth2:authorize"),
570            data={
571                "response_type": "id_token",
572                "response_mode": "form_post",
573                "client_id": provider.client_id,
574                "state": state,
575                "scope": "openid",
576                "redirect_uri": "http://localhost",
577                "nonce": generate_id(),
578            },
579        )
580        response = self.client.get(
581            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
582        )
583        token = AccessToken.objects.filter(user=user).first()
584        self.assertIsNotNone(token)
585        self.assertJSONEqual(
586            response.content.decode(),
587            {
588                "component": "ak-stage-autosubmit",
589                "url": "http://localhost",
590                "title": f"Redirecting to {app.name}...",
591                "attrs": {
592                    "id_token": provider.encode(token.id_token.to_dict()),
593                    "token_type": TOKEN_TYPE,
594                    "expires_in": "3600",
595                    "state": state,
596                },
597            },
598        )
599        self.validate_jwt(token, provider)

Test full authorization (form_post response)

def test_full_form_post_code(self):
601    def test_full_form_post_code(self):
602        """Test full authorization (form_post response, code type)"""
603        flow = create_test_flow()
604        provider = OAuth2Provider.objects.create(
605            name=generate_id(),
606            client_id=generate_id(),
607            authorization_flow=flow,
608            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
609            signing_key=self.keypair,
610            grant_types=[GrantType.AUTHORIZATION_CODE],
611        )
612        app = Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
613        state = generate_id()
614        user = create_test_admin_user()
615        self.client.force_login(user)
616        # Step 1, initiate params and get redirect to flow
617        self.client.get(
618            reverse("authentik_providers_oauth2:authorize"),
619            data={
620                "response_type": "code",
621                "response_mode": "form_post",
622                "client_id": provider.client_id,
623                "state": state,
624                "scope": "openid",
625                "redirect_uri": "http://localhost",
626            },
627        )
628        response = self.client.get(
629            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
630        )
631        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
632        self.assertJSONEqual(
633            response.content.decode(),
634            {
635                "component": "ak-stage-autosubmit",
636                "url": "http://localhost",
637                "title": f"Redirecting to {app.name}...",
638                "attrs": {
639                    "code": code.code,
640                    "state": state,
641                },
642            },
643        )

Test full authorization (form_post response, code type)

def test_openid_missing_invalid(self):
645    def test_openid_missing_invalid(self):
646        """test request requiring an OpenID scope to be set"""
647        OAuth2Provider.objects.create(
648            name=generate_id(),
649            client_id="test",
650            authorization_flow=create_test_flow(),
651            grant_types=[GrantType.IMPLICIT],
652            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
653        )
654        request = self.factory.get(
655            "/",
656            data={
657                "response_type": "id_token",
658                "client_id": "test",
659                "redirect_uri": "http://localhost",
660                "scope": "",
661            },
662        )
663        with self.assertRaises(AuthorizeError) as cm:
664            OAuthAuthorizationParams.from_request(request)
665        self.assertEqual(cm.exception.cause, "scope_openid_missing")

test request requiring an OpenID scope to be set

@apply_blueprint('system/providers-oauth2.yaml')
def test_offline_access_invalid(self):
667    @apply_blueprint("system/providers-oauth2.yaml")
668    def test_offline_access_invalid(self):
669        """test request for offline_access with invalid response type"""
670        provider = OAuth2Provider.objects.create(
671            name=generate_id(),
672            client_id="test",
673            authorization_flow=create_test_flow(),
674            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
675            grant_types=[GrantType.IMPLICIT],
676        )
677        provider.property_mappings.set(
678            ScopeMapping.objects.filter(
679                managed__in=[
680                    "goauthentik.io/providers/oauth2/scope-openid",
681                    "goauthentik.io/providers/oauth2/scope-offline_access",
682                ]
683            )
684        )
685        request = self.factory.get(
686            "/",
687            data={
688                "response_type": "id_token",
689                "client_id": "test",
690                "redirect_uri": "http://localhost",
691                "scope": f"{SCOPE_OPENID} {SCOPE_OFFLINE_ACCESS}",
692                "nonce": generate_id(),
693            },
694        )
695        parsed = OAuthAuthorizationParams.from_request(request)
696        self.assertNotIn(SCOPE_OFFLINE_ACCESS, parsed.scope)

test request for offline_access with invalid response type

@apply_blueprint('default/flow-default-authentication-flow.yaml')
def test_ui_locales(self):
698    @apply_blueprint("default/flow-default-authentication-flow.yaml")
699    def test_ui_locales(self):
700        """Test OIDC ui_locales authorization"""
701        flow = create_test_flow()
702        provider = OAuth2Provider.objects.create(
703            name=generate_id(),
704            client_id="test",
705            authorization_flow=flow,
706            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
707            access_code_validity="seconds=100",
708            grant_types=[GrantType.AUTHORIZATION_CODE],
709        )
710        Application.objects.create(name="app", slug="app", provider=provider)
711        state = generate_id()
712        self.client.logout()
713        try:
714            response = self.client.get(
715                reverse("authentik_providers_oauth2:authorize"),
716                data={
717                    "response_type": "code",
718                    "client_id": "test",
719                    "state": state,
720                    "redirect_uri": "foo://localhost",
721                    "ui_locales": "invalid fr",
722                },
723            )
724            parsed = parse_qs(urlparse(response.url).query)
725            self.assertEqual(parsed["locale"], ["fr"])
726        finally:
727            translation.deactivate()

Test OIDC ui_locales authorization

@apply_blueprint('default/flow-default-authentication-flow.yaml')
def test_ui_locales_invalid(self):
729    @apply_blueprint("default/flow-default-authentication-flow.yaml")
730    def test_ui_locales_invalid(self):
731        """Test OIDC ui_locales authorization"""
732        flow = create_test_flow()
733        provider = OAuth2Provider.objects.create(
734            name=generate_id(),
735            client_id="test",
736            authorization_flow=flow,
737            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
738            access_code_validity="seconds=100",
739            grant_types=[GrantType.AUTHORIZATION_CODE],
740        )
741        Application.objects.create(name="app", slug="app", provider=provider)
742        state = generate_id()
743        self.client.logout()
744        response = self.client.get(
745            reverse("authentik_providers_oauth2:authorize"),
746            data={
747                "response_type": "code",
748                "client_id": "test",
749                "state": state,
750                "redirect_uri": "foo://localhost",
751                "ui_locales": "invalid",
752            },
753        )
754        parsed = parse_qs(urlparse(response.url).query)
755        self.assertNotIn("locale", parsed)

Test OIDC ui_locales authorization

def test_authentication_flow(self):
757    def test_authentication_flow(self):
758        """Test custom authentication flow"""
759        brand = create_test_brand()
760        global_auth = create_test_flow()
761        FlowStageBinding.objects.create(
762            target=global_auth, stage=DummyStage.objects.create(name=generate_id()), order=10
763        )
764        brand.flow_authentication = global_auth
765        brand.save()
766
767        flow = create_test_flow()
768        auth_flow = create_test_flow()
769        FlowStageBinding.objects.create(
770            target=auth_flow, stage=DummyStage.objects.create(name=generate_id()), order=10
771        )
772        provider = OAuth2Provider.objects.create(
773            name=generate_id(),
774            client_id="test",
775            authorization_flow=flow,
776            authentication_flow=auth_flow,
777            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
778            access_code_validity="seconds=100",
779            grant_types=[GrantType.AUTHORIZATION_CODE],
780        )
781        Application.objects.create(name="app", slug="app", provider=provider)
782        state = generate_id()
783        response = self.client.get(
784            reverse("authentik_providers_oauth2:authorize"),
785            data={
786                "response_type": "code",
787                "client_id": "test",
788                "state": state,
789                "redirect_uri": "foo://localhost",
790            },
791        )
792        self.assertEqual(response.status_code, 302)
793        self.assertIn(auth_flow.slug, response.url)
794        self.assertNotIn(global_auth.slug, response.url)

Test custom authentication flow

@apply_blueprint('default/flow-default-authentication-flow.yaml')
def test_login_hint(self):
796    @apply_blueprint("default/flow-default-authentication-flow.yaml")
797    def test_login_hint(self):
798        """Login hint"""
799        flow = create_test_flow()
800        provider = OAuth2Provider.objects.create(
801            name=generate_id(),
802            client_id="test",
803            authorization_flow=flow,
804            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
805            access_code_validity="seconds=100",
806            grant_types=[GrantType.AUTHORIZATION_CODE],
807        )
808        Application.objects.create(name="app", slug="app", provider=provider)
809        state = generate_id()
810        response = self.client.get(
811            reverse("authentik_providers_oauth2:authorize"),
812            data={
813                "response_type": "code",
814                "client_id": "test",
815                "state": state,
816                "redirect_uri": "foo://localhost",
817                "login_hint": "foo",
818            },
819        )
820        self.assertEqual(response.status_code, 302)
821        plan = self.client.session.get(SESSION_KEY_PLAN)
822        self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER], "foo")

Login hint