authentik.providers.oauth2.tests.test_token_pkce

Test token view

  1"""Test token view"""
  2
  3from base64 import b64encode
  4
  5from django.test import RequestFactory
  6from django.urls import reverse
  7
  8from authentik.common.oauth.constants import GRANT_TYPE_AUTHORIZATION_CODE
  9from authentik.core.models import Application
 10from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 11from authentik.lib.generators import generate_id
 12from authentik.providers.oauth2.models import (
 13    AuthorizationCode,
 14    OAuth2Provider,
 15    RedirectURI,
 16    RedirectURIMatchingMode,
 17)
 18from authentik.providers.oauth2.tests.utils import OAuthTestCase
 19from authentik.providers.oauth2.utils import pkce_s256_challenge
 20
 21
 22class TestTokenPKCE(OAuthTestCase):
 23    """Test token view"""
 24
 25    def setUp(self) -> None:
 26        super().setUp()
 27        self.factory = RequestFactory()
 28        self.app = Application.objects.create(name=generate_id(), slug="test")
 29
 30    def test_pkce_missing_in_authorize(self):
 31        """Test PKCE with code_challenge in authorize request
 32        and missing verifier in token request"""
 33        flow = create_test_flow()
 34        provider = OAuth2Provider.objects.create(
 35            name=generate_id(),
 36            client_id="test",
 37            authorization_flow=flow,
 38            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
 39            access_code_validity="seconds=100",
 40        )
 41        Application.objects.create(name="app", slug="app", provider=provider)
 42        state = generate_id()
 43        user = create_test_admin_user()
 44        self.client.force_login(user)
 45        challenge = generate_id()
 46        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 47        # Step 1, initiate params and get redirect to flow
 48        response = self.client.get(
 49            reverse("authentik_providers_oauth2:authorize"),
 50            data={
 51                "response_type": "code",
 52                "client_id": "test",
 53                "state": state,
 54                "redirect_uri": "foo://localhost",
 55                "code_challenge": challenge,
 56                "code_challenge_method": "S256",
 57            },
 58        )
 59        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
 60        self.assertEqual(
 61            response.url,
 62            f"foo://localhost?code={code.code}&state={state}",
 63        )
 64        response = self.client.post(
 65            reverse("authentik_providers_oauth2:token"),
 66            data={
 67                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 68                "code": code.code,
 69                # Missing the code_verifier here
 70                "redirect_uri": "foo://localhost",
 71            },
 72            HTTP_AUTHORIZATION=f"Basic {header}",
 73        )
 74        self.assertJSONEqual(
 75            response.content,
 76            {
 77                "error": "invalid_grant",
 78                "error_description": (
 79                    "The provided authorization grant or refresh token is invalid, expired, "
 80                    "revoked, does not match the redirection URI used in the authorization "
 81                    "request, or was issued to another client"
 82                ),
 83                "request_id": response.headers["X-authentik-id"],
 84            },
 85        )
 86        self.assertEqual(response.status_code, 400)
 87
 88    def test_pkce_missing_in_token(self):
 89        """Test PKCE with missing code_challenge in authorization request but verifier
 90        set in token request"""
 91        flow = create_test_flow()
 92        provider = OAuth2Provider.objects.create(
 93            name=generate_id(),
 94            client_id="test",
 95            authorization_flow=flow,
 96            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
 97            access_code_validity="seconds=100",
 98        )
 99        Application.objects.create(name="app", slug="app", provider=provider)
100        state = generate_id()
101        user = create_test_admin_user()
102        self.client.force_login(user)
103        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
104        # Step 1, initiate params and get redirect to flow
105        response = self.client.get(
106            reverse("authentik_providers_oauth2:authorize"),
107            data={
108                "response_type": "code",
109                "client_id": "test",
110                "state": state,
111                "redirect_uri": "foo://localhost",
112                # "code_challenge": challenge,
113                # "code_challenge_method": "S256",
114            },
115        )
116        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
117        self.assertEqual(
118            response.url,
119            f"foo://localhost?code={code.code}&state={state}",
120        )
121        response = self.client.post(
122            reverse("authentik_providers_oauth2:token"),
123            data={
124                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
125                "code": code.code,
126                "code_verifier": generate_id(),
127                "redirect_uri": "foo://localhost",
128            },
129            HTTP_AUTHORIZATION=f"Basic {header}",
130        )
131        self.assertJSONEqual(
132            response.content,
133            {
134                "error": "invalid_grant",
135                "error_description": (
136                    "The provided authorization grant or refresh token is invalid, expired, "
137                    "revoked, does not match the redirection URI used in the authorization "
138                    "request, or was issued to another client"
139                ),
140                "request_id": response.headers["X-authentik-id"],
141            },
142        )
143        self.assertEqual(response.status_code, 400)
144
145    def test_pkce_correct_s256(self):
146        """Test full with pkce"""
147        flow = create_test_flow()
148        provider = OAuth2Provider.objects.create(
149            name=generate_id(),
150            client_id="test",
151            authorization_flow=flow,
152            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
153            access_code_validity="seconds=100",
154        )
155        Application.objects.create(name="app", slug="app", provider=provider)
156        state = generate_id()
157        user = create_test_admin_user()
158        self.client.force_login(user)
159        verifier = generate_id()
160        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
161        # Step 1, initiate params and get redirect to flow
162        response = self.client.get(
163            reverse("authentik_providers_oauth2:authorize"),
164            data={
165                "response_type": "code",
166                "client_id": "test",
167                "state": state,
168                "redirect_uri": "foo://localhost",
169                "code_challenge": pkce_s256_challenge(verifier),
170                "code_challenge_method": "S256",
171            },
172        )
173        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
174        self.assertEqual(
175            response.url,
176            f"foo://localhost?code={code.code}&state={state}",
177        )
178        response = self.client.post(
179            reverse("authentik_providers_oauth2:token"),
180            data={
181                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
182                "code": code.code,
183                "code_verifier": verifier,
184                "redirect_uri": "foo://localhost",
185            },
186            HTTP_AUTHORIZATION=f"Basic {header}",
187        )
188        self.assertEqual(response.status_code, 200)
189
190    def test_pkce_correct_plain(self):
191        """Test full with pkce"""
192        flow = create_test_flow()
193        provider = OAuth2Provider.objects.create(
194            name=generate_id(),
195            client_id="test",
196            authorization_flow=flow,
197            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
198            access_code_validity="seconds=100",
199        )
200        Application.objects.create(name="app", slug="app", provider=provider)
201        state = generate_id()
202        user = create_test_admin_user()
203        self.client.force_login(user)
204        verifier = generate_id()
205        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
206        # Step 1, initiate params and get redirect to flow
207        response = self.client.get(
208            reverse("authentik_providers_oauth2:authorize"),
209            data={
210                "response_type": "code",
211                "client_id": "test",
212                "state": state,
213                "redirect_uri": "foo://localhost",
214                "code_challenge": verifier,
215            },
216        )
217        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
218        self.assertEqual(
219            response.url,
220            f"foo://localhost?code={code.code}&state={state}",
221        )
222        response = self.client.post(
223            reverse("authentik_providers_oauth2:token"),
224            data={
225                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
226                "code": code.code,
227                "code_verifier": verifier,
228                "redirect_uri": "foo://localhost",
229            },
230            HTTP_AUTHORIZATION=f"Basic {header}",
231        )
232        self.assertEqual(response.status_code, 200)
class TestTokenPKCE(authentik.providers.oauth2.tests.utils.OAuthTestCase):
 23class TestTokenPKCE(OAuthTestCase):
 24    """Test token view"""
 25
 26    def setUp(self) -> None:
 27        super().setUp()
 28        self.factory = RequestFactory()
 29        self.app = Application.objects.create(name=generate_id(), slug="test")
 30
 31    def test_pkce_missing_in_authorize(self):
 32        """Test PKCE with code_challenge in authorize request
 33        and missing verifier in token request"""
 34        flow = create_test_flow()
 35        provider = OAuth2Provider.objects.create(
 36            name=generate_id(),
 37            client_id="test",
 38            authorization_flow=flow,
 39            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
 40            access_code_validity="seconds=100",
 41        )
 42        Application.objects.create(name="app", slug="app", provider=provider)
 43        state = generate_id()
 44        user = create_test_admin_user()
 45        self.client.force_login(user)
 46        challenge = generate_id()
 47        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
 48        # Step 1, initiate params and get redirect to flow
 49        response = self.client.get(
 50            reverse("authentik_providers_oauth2:authorize"),
 51            data={
 52                "response_type": "code",
 53                "client_id": "test",
 54                "state": state,
 55                "redirect_uri": "foo://localhost",
 56                "code_challenge": challenge,
 57                "code_challenge_method": "S256",
 58            },
 59        )
 60        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
 61        self.assertEqual(
 62            response.url,
 63            f"foo://localhost?code={code.code}&state={state}",
 64        )
 65        response = self.client.post(
 66            reverse("authentik_providers_oauth2:token"),
 67            data={
 68                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
 69                "code": code.code,
 70                # Missing the code_verifier here
 71                "redirect_uri": "foo://localhost",
 72            },
 73            HTTP_AUTHORIZATION=f"Basic {header}",
 74        )
 75        self.assertJSONEqual(
 76            response.content,
 77            {
 78                "error": "invalid_grant",
 79                "error_description": (
 80                    "The provided authorization grant or refresh token is invalid, expired, "
 81                    "revoked, does not match the redirection URI used in the authorization "
 82                    "request, or was issued to another client"
 83                ),
 84                "request_id": response.headers["X-authentik-id"],
 85            },
 86        )
 87        self.assertEqual(response.status_code, 400)
 88
 89    def test_pkce_missing_in_token(self):
 90        """Test PKCE with missing code_challenge in authorization request but verifier
 91        set in token request"""
 92        flow = create_test_flow()
 93        provider = OAuth2Provider.objects.create(
 94            name=generate_id(),
 95            client_id="test",
 96            authorization_flow=flow,
 97            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
 98            access_code_validity="seconds=100",
 99        )
100        Application.objects.create(name="app", slug="app", provider=provider)
101        state = generate_id()
102        user = create_test_admin_user()
103        self.client.force_login(user)
104        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
105        # Step 1, initiate params and get redirect to flow
106        response = self.client.get(
107            reverse("authentik_providers_oauth2:authorize"),
108            data={
109                "response_type": "code",
110                "client_id": "test",
111                "state": state,
112                "redirect_uri": "foo://localhost",
113                # "code_challenge": challenge,
114                # "code_challenge_method": "S256",
115            },
116        )
117        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
118        self.assertEqual(
119            response.url,
120            f"foo://localhost?code={code.code}&state={state}",
121        )
122        response = self.client.post(
123            reverse("authentik_providers_oauth2:token"),
124            data={
125                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
126                "code": code.code,
127                "code_verifier": generate_id(),
128                "redirect_uri": "foo://localhost",
129            },
130            HTTP_AUTHORIZATION=f"Basic {header}",
131        )
132        self.assertJSONEqual(
133            response.content,
134            {
135                "error": "invalid_grant",
136                "error_description": (
137                    "The provided authorization grant or refresh token is invalid, expired, "
138                    "revoked, does not match the redirection URI used in the authorization "
139                    "request, or was issued to another client"
140                ),
141                "request_id": response.headers["X-authentik-id"],
142            },
143        )
144        self.assertEqual(response.status_code, 400)
145
146    def test_pkce_correct_s256(self):
147        """Test full with pkce"""
148        flow = create_test_flow()
149        provider = OAuth2Provider.objects.create(
150            name=generate_id(),
151            client_id="test",
152            authorization_flow=flow,
153            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
154            access_code_validity="seconds=100",
155        )
156        Application.objects.create(name="app", slug="app", provider=provider)
157        state = generate_id()
158        user = create_test_admin_user()
159        self.client.force_login(user)
160        verifier = generate_id()
161        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
162        # Step 1, initiate params and get redirect to flow
163        response = self.client.get(
164            reverse("authentik_providers_oauth2:authorize"),
165            data={
166                "response_type": "code",
167                "client_id": "test",
168                "state": state,
169                "redirect_uri": "foo://localhost",
170                "code_challenge": pkce_s256_challenge(verifier),
171                "code_challenge_method": "S256",
172            },
173        )
174        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
175        self.assertEqual(
176            response.url,
177            f"foo://localhost?code={code.code}&state={state}",
178        )
179        response = self.client.post(
180            reverse("authentik_providers_oauth2:token"),
181            data={
182                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
183                "code": code.code,
184                "code_verifier": verifier,
185                "redirect_uri": "foo://localhost",
186            },
187            HTTP_AUTHORIZATION=f"Basic {header}",
188        )
189        self.assertEqual(response.status_code, 200)
190
191    def test_pkce_correct_plain(self):
192        """Test full with pkce"""
193        flow = create_test_flow()
194        provider = OAuth2Provider.objects.create(
195            name=generate_id(),
196            client_id="test",
197            authorization_flow=flow,
198            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
199            access_code_validity="seconds=100",
200        )
201        Application.objects.create(name="app", slug="app", provider=provider)
202        state = generate_id()
203        user = create_test_admin_user()
204        self.client.force_login(user)
205        verifier = generate_id()
206        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
207        # Step 1, initiate params and get redirect to flow
208        response = self.client.get(
209            reverse("authentik_providers_oauth2:authorize"),
210            data={
211                "response_type": "code",
212                "client_id": "test",
213                "state": state,
214                "redirect_uri": "foo://localhost",
215                "code_challenge": verifier,
216            },
217        )
218        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
219        self.assertEqual(
220            response.url,
221            f"foo://localhost?code={code.code}&state={state}",
222        )
223        response = self.client.post(
224            reverse("authentik_providers_oauth2:token"),
225            data={
226                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
227                "code": code.code,
228                "code_verifier": verifier,
229                "redirect_uri": "foo://localhost",
230            },
231            HTTP_AUTHORIZATION=f"Basic {header}",
232        )
233        self.assertEqual(response.status_code, 200)

Test token view

def setUp(self) -> None:
26    def setUp(self) -> None:
27        super().setUp()
28        self.factory = RequestFactory()
29        self.app = Application.objects.create(name=generate_id(), slug="test")

Hook method for setting up the test fixture before exercising it.

def test_pkce_missing_in_authorize(self):
31    def test_pkce_missing_in_authorize(self):
32        """Test PKCE with code_challenge in authorize request
33        and missing verifier in token request"""
34        flow = create_test_flow()
35        provider = OAuth2Provider.objects.create(
36            name=generate_id(),
37            client_id="test",
38            authorization_flow=flow,
39            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
40            access_code_validity="seconds=100",
41        )
42        Application.objects.create(name="app", slug="app", provider=provider)
43        state = generate_id()
44        user = create_test_admin_user()
45        self.client.force_login(user)
46        challenge = generate_id()
47        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
48        # Step 1, initiate params and get redirect to flow
49        response = self.client.get(
50            reverse("authentik_providers_oauth2:authorize"),
51            data={
52                "response_type": "code",
53                "client_id": "test",
54                "state": state,
55                "redirect_uri": "foo://localhost",
56                "code_challenge": challenge,
57                "code_challenge_method": "S256",
58            },
59        )
60        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
61        self.assertEqual(
62            response.url,
63            f"foo://localhost?code={code.code}&state={state}",
64        )
65        response = self.client.post(
66            reverse("authentik_providers_oauth2:token"),
67            data={
68                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
69                "code": code.code,
70                # Missing the code_verifier here
71                "redirect_uri": "foo://localhost",
72            },
73            HTTP_AUTHORIZATION=f"Basic {header}",
74        )
75        self.assertJSONEqual(
76            response.content,
77            {
78                "error": "invalid_grant",
79                "error_description": (
80                    "The provided authorization grant or refresh token is invalid, expired, "
81                    "revoked, does not match the redirection URI used in the authorization "
82                    "request, or was issued to another client"
83                ),
84                "request_id": response.headers["X-authentik-id"],
85            },
86        )
87        self.assertEqual(response.status_code, 400)

Test PKCE with code_challenge in authorize request and missing verifier in token request

def test_pkce_missing_in_token(self):
 89    def test_pkce_missing_in_token(self):
 90        """Test PKCE with missing code_challenge in authorization request but verifier
 91        set in token request"""
 92        flow = create_test_flow()
 93        provider = OAuth2Provider.objects.create(
 94            name=generate_id(),
 95            client_id="test",
 96            authorization_flow=flow,
 97            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
 98            access_code_validity="seconds=100",
 99        )
100        Application.objects.create(name="app", slug="app", provider=provider)
101        state = generate_id()
102        user = create_test_admin_user()
103        self.client.force_login(user)
104        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
105        # Step 1, initiate params and get redirect to flow
106        response = self.client.get(
107            reverse("authentik_providers_oauth2:authorize"),
108            data={
109                "response_type": "code",
110                "client_id": "test",
111                "state": state,
112                "redirect_uri": "foo://localhost",
113                # "code_challenge": challenge,
114                # "code_challenge_method": "S256",
115            },
116        )
117        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
118        self.assertEqual(
119            response.url,
120            f"foo://localhost?code={code.code}&state={state}",
121        )
122        response = self.client.post(
123            reverse("authentik_providers_oauth2:token"),
124            data={
125                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
126                "code": code.code,
127                "code_verifier": generate_id(),
128                "redirect_uri": "foo://localhost",
129            },
130            HTTP_AUTHORIZATION=f"Basic {header}",
131        )
132        self.assertJSONEqual(
133            response.content,
134            {
135                "error": "invalid_grant",
136                "error_description": (
137                    "The provided authorization grant or refresh token is invalid, expired, "
138                    "revoked, does not match the redirection URI used in the authorization "
139                    "request, or was issued to another client"
140                ),
141                "request_id": response.headers["X-authentik-id"],
142            },
143        )
144        self.assertEqual(response.status_code, 400)

Test PKCE with missing code_challenge in authorization request but verifier set in token request

def test_pkce_correct_s256(self):
146    def test_pkce_correct_s256(self):
147        """Test full with pkce"""
148        flow = create_test_flow()
149        provider = OAuth2Provider.objects.create(
150            name=generate_id(),
151            client_id="test",
152            authorization_flow=flow,
153            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
154            access_code_validity="seconds=100",
155        )
156        Application.objects.create(name="app", slug="app", provider=provider)
157        state = generate_id()
158        user = create_test_admin_user()
159        self.client.force_login(user)
160        verifier = generate_id()
161        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
162        # Step 1, initiate params and get redirect to flow
163        response = self.client.get(
164            reverse("authentik_providers_oauth2:authorize"),
165            data={
166                "response_type": "code",
167                "client_id": "test",
168                "state": state,
169                "redirect_uri": "foo://localhost",
170                "code_challenge": pkce_s256_challenge(verifier),
171                "code_challenge_method": "S256",
172            },
173        )
174        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
175        self.assertEqual(
176            response.url,
177            f"foo://localhost?code={code.code}&state={state}",
178        )
179        response = self.client.post(
180            reverse("authentik_providers_oauth2:token"),
181            data={
182                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
183                "code": code.code,
184                "code_verifier": verifier,
185                "redirect_uri": "foo://localhost",
186            },
187            HTTP_AUTHORIZATION=f"Basic {header}",
188        )
189        self.assertEqual(response.status_code, 200)

Test full with pkce

def test_pkce_correct_plain(self):
191    def test_pkce_correct_plain(self):
192        """Test full with pkce"""
193        flow = create_test_flow()
194        provider = OAuth2Provider.objects.create(
195            name=generate_id(),
196            client_id="test",
197            authorization_flow=flow,
198            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")],
199            access_code_validity="seconds=100",
200        )
201        Application.objects.create(name="app", slug="app", provider=provider)
202        state = generate_id()
203        user = create_test_admin_user()
204        self.client.force_login(user)
205        verifier = generate_id()
206        header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode()
207        # Step 1, initiate params and get redirect to flow
208        response = self.client.get(
209            reverse("authentik_providers_oauth2:authorize"),
210            data={
211                "response_type": "code",
212                "client_id": "test",
213                "state": state,
214                "redirect_uri": "foo://localhost",
215                "code_challenge": verifier,
216            },
217        )
218        code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first()
219        self.assertEqual(
220            response.url,
221            f"foo://localhost?code={code.code}&state={state}",
222        )
223        response = self.client.post(
224            reverse("authentik_providers_oauth2:token"),
225            data={
226                "grant_type": GRANT_TYPE_AUTHORIZATION_CODE,
227                "code": code.code,
228                "code_verifier": verifier,
229                "redirect_uri": "foo://localhost",
230            },
231            HTTP_AUTHORIZATION=f"Basic {header}",
232        )
233        self.assertEqual(response.status_code, 200)

Test full with pkce