authentik.providers.oauth2.tests.test_token_pkce
Test token view
1"""Test token view""" 2 3from base64 import b64encode 4 5from django.test import RequestFactory 6from django.urls import reverse 7 8from authentik.common.oauth.constants import GRANT_TYPE_AUTHORIZATION_CODE 9from authentik.core.models import Application 10from authentik.core.tests.utils import create_test_admin_user, create_test_flow 11from authentik.lib.generators import generate_id 12from authentik.providers.oauth2.models import ( 13 AuthorizationCode, 14 OAuth2Provider, 15 RedirectURI, 16 RedirectURIMatchingMode, 17) 18from authentik.providers.oauth2.tests.utils import OAuthTestCase 19from authentik.providers.oauth2.utils import pkce_s256_challenge 20 21 22class TestTokenPKCE(OAuthTestCase): 23 """Test token view""" 24 25 def setUp(self) -> None: 26 super().setUp() 27 self.factory = RequestFactory() 28 self.app = Application.objects.create(name=generate_id(), slug="test") 29 30 def test_pkce_missing_in_authorize(self): 31 """Test PKCE with code_challenge in authorize request 32 and missing verifier in token request""" 33 flow = create_test_flow() 34 provider = OAuth2Provider.objects.create( 35 name=generate_id(), 36 client_id="test", 37 authorization_flow=flow, 38 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 39 access_code_validity="seconds=100", 40 ) 41 Application.objects.create(name="app", slug="app", provider=provider) 42 state = generate_id() 43 user = create_test_admin_user() 44 self.client.force_login(user) 45 challenge = generate_id() 46 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 47 # Step 1, initiate params and get redirect to flow 48 response = self.client.get( 49 reverse("authentik_providers_oauth2:authorize"), 50 data={ 51 "response_type": "code", 52 "client_id": "test", 53 "state": state, 54 "redirect_uri": "foo://localhost", 55 "code_challenge": challenge, 56 "code_challenge_method": "S256", 57 }, 58 ) 59 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 60 self.assertEqual( 61 response.url, 62 f"foo://localhost?code={code.code}&state={state}", 63 ) 64 response = self.client.post( 65 reverse("authentik_providers_oauth2:token"), 66 data={ 67 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 68 "code": code.code, 69 # Missing the code_verifier here 70 "redirect_uri": "foo://localhost", 71 }, 72 HTTP_AUTHORIZATION=f"Basic {header}", 73 ) 74 self.assertJSONEqual( 75 response.content, 76 { 77 "error": "invalid_grant", 78 "error_description": ( 79 "The provided authorization grant or refresh token is invalid, expired, " 80 "revoked, does not match the redirection URI used in the authorization " 81 "request, or was issued to another client" 82 ), 83 "request_id": response.headers["X-authentik-id"], 84 }, 85 ) 86 self.assertEqual(response.status_code, 400) 87 88 def test_pkce_missing_in_token(self): 89 """Test PKCE with missing code_challenge in authorization request but verifier 90 set in token request""" 91 flow = create_test_flow() 92 provider = OAuth2Provider.objects.create( 93 name=generate_id(), 94 client_id="test", 95 authorization_flow=flow, 96 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 97 access_code_validity="seconds=100", 98 ) 99 Application.objects.create(name="app", slug="app", provider=provider) 100 state = generate_id() 101 user = create_test_admin_user() 102 self.client.force_login(user) 103 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 104 # Step 1, initiate params and get redirect to flow 105 response = self.client.get( 106 reverse("authentik_providers_oauth2:authorize"), 107 data={ 108 "response_type": "code", 109 "client_id": "test", 110 "state": state, 111 "redirect_uri": "foo://localhost", 112 # "code_challenge": challenge, 113 # "code_challenge_method": "S256", 114 }, 115 ) 116 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 117 self.assertEqual( 118 response.url, 119 f"foo://localhost?code={code.code}&state={state}", 120 ) 121 response = self.client.post( 122 reverse("authentik_providers_oauth2:token"), 123 data={ 124 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 125 "code": code.code, 126 "code_verifier": generate_id(), 127 "redirect_uri": "foo://localhost", 128 }, 129 HTTP_AUTHORIZATION=f"Basic {header}", 130 ) 131 self.assertJSONEqual( 132 response.content, 133 { 134 "error": "invalid_grant", 135 "error_description": ( 136 "The provided authorization grant or refresh token is invalid, expired, " 137 "revoked, does not match the redirection URI used in the authorization " 138 "request, or was issued to another client" 139 ), 140 "request_id": response.headers["X-authentik-id"], 141 }, 142 ) 143 self.assertEqual(response.status_code, 400) 144 145 def test_pkce_correct_s256(self): 146 """Test full with pkce""" 147 flow = create_test_flow() 148 provider = OAuth2Provider.objects.create( 149 name=generate_id(), 150 client_id="test", 151 authorization_flow=flow, 152 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 153 access_code_validity="seconds=100", 154 ) 155 Application.objects.create(name="app", slug="app", provider=provider) 156 state = generate_id() 157 user = create_test_admin_user() 158 self.client.force_login(user) 159 verifier = generate_id() 160 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 161 # Step 1, initiate params and get redirect to flow 162 response = self.client.get( 163 reverse("authentik_providers_oauth2:authorize"), 164 data={ 165 "response_type": "code", 166 "client_id": "test", 167 "state": state, 168 "redirect_uri": "foo://localhost", 169 "code_challenge": pkce_s256_challenge(verifier), 170 "code_challenge_method": "S256", 171 }, 172 ) 173 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 174 self.assertEqual( 175 response.url, 176 f"foo://localhost?code={code.code}&state={state}", 177 ) 178 response = self.client.post( 179 reverse("authentik_providers_oauth2:token"), 180 data={ 181 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 182 "code": code.code, 183 "code_verifier": verifier, 184 "redirect_uri": "foo://localhost", 185 }, 186 HTTP_AUTHORIZATION=f"Basic {header}", 187 ) 188 self.assertEqual(response.status_code, 200) 189 190 def test_pkce_correct_plain(self): 191 """Test full with pkce""" 192 flow = create_test_flow() 193 provider = OAuth2Provider.objects.create( 194 name=generate_id(), 195 client_id="test", 196 authorization_flow=flow, 197 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 198 access_code_validity="seconds=100", 199 ) 200 Application.objects.create(name="app", slug="app", provider=provider) 201 state = generate_id() 202 user = create_test_admin_user() 203 self.client.force_login(user) 204 verifier = generate_id() 205 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 206 # Step 1, initiate params and get redirect to flow 207 response = self.client.get( 208 reverse("authentik_providers_oauth2:authorize"), 209 data={ 210 "response_type": "code", 211 "client_id": "test", 212 "state": state, 213 "redirect_uri": "foo://localhost", 214 "code_challenge": verifier, 215 }, 216 ) 217 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 218 self.assertEqual( 219 response.url, 220 f"foo://localhost?code={code.code}&state={state}", 221 ) 222 response = self.client.post( 223 reverse("authentik_providers_oauth2:token"), 224 data={ 225 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 226 "code": code.code, 227 "code_verifier": verifier, 228 "redirect_uri": "foo://localhost", 229 }, 230 HTTP_AUTHORIZATION=f"Basic {header}", 231 ) 232 self.assertEqual(response.status_code, 200)
23class TestTokenPKCE(OAuthTestCase): 24 """Test token view""" 25 26 def setUp(self) -> None: 27 super().setUp() 28 self.factory = RequestFactory() 29 self.app = Application.objects.create(name=generate_id(), slug="test") 30 31 def test_pkce_missing_in_authorize(self): 32 """Test PKCE with code_challenge in authorize request 33 and missing verifier in token request""" 34 flow = create_test_flow() 35 provider = OAuth2Provider.objects.create( 36 name=generate_id(), 37 client_id="test", 38 authorization_flow=flow, 39 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 40 access_code_validity="seconds=100", 41 ) 42 Application.objects.create(name="app", slug="app", provider=provider) 43 state = generate_id() 44 user = create_test_admin_user() 45 self.client.force_login(user) 46 challenge = generate_id() 47 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 48 # Step 1, initiate params and get redirect to flow 49 response = self.client.get( 50 reverse("authentik_providers_oauth2:authorize"), 51 data={ 52 "response_type": "code", 53 "client_id": "test", 54 "state": state, 55 "redirect_uri": "foo://localhost", 56 "code_challenge": challenge, 57 "code_challenge_method": "S256", 58 }, 59 ) 60 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 61 self.assertEqual( 62 response.url, 63 f"foo://localhost?code={code.code}&state={state}", 64 ) 65 response = self.client.post( 66 reverse("authentik_providers_oauth2:token"), 67 data={ 68 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 69 "code": code.code, 70 # Missing the code_verifier here 71 "redirect_uri": "foo://localhost", 72 }, 73 HTTP_AUTHORIZATION=f"Basic {header}", 74 ) 75 self.assertJSONEqual( 76 response.content, 77 { 78 "error": "invalid_grant", 79 "error_description": ( 80 "The provided authorization grant or refresh token is invalid, expired, " 81 "revoked, does not match the redirection URI used in the authorization " 82 "request, or was issued to another client" 83 ), 84 "request_id": response.headers["X-authentik-id"], 85 }, 86 ) 87 self.assertEqual(response.status_code, 400) 88 89 def test_pkce_missing_in_token(self): 90 """Test PKCE with missing code_challenge in authorization request but verifier 91 set in token request""" 92 flow = create_test_flow() 93 provider = OAuth2Provider.objects.create( 94 name=generate_id(), 95 client_id="test", 96 authorization_flow=flow, 97 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 98 access_code_validity="seconds=100", 99 ) 100 Application.objects.create(name="app", slug="app", provider=provider) 101 state = generate_id() 102 user = create_test_admin_user() 103 self.client.force_login(user) 104 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 105 # Step 1, initiate params and get redirect to flow 106 response = self.client.get( 107 reverse("authentik_providers_oauth2:authorize"), 108 data={ 109 "response_type": "code", 110 "client_id": "test", 111 "state": state, 112 "redirect_uri": "foo://localhost", 113 # "code_challenge": challenge, 114 # "code_challenge_method": "S256", 115 }, 116 ) 117 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 118 self.assertEqual( 119 response.url, 120 f"foo://localhost?code={code.code}&state={state}", 121 ) 122 response = self.client.post( 123 reverse("authentik_providers_oauth2:token"), 124 data={ 125 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 126 "code": code.code, 127 "code_verifier": generate_id(), 128 "redirect_uri": "foo://localhost", 129 }, 130 HTTP_AUTHORIZATION=f"Basic {header}", 131 ) 132 self.assertJSONEqual( 133 response.content, 134 { 135 "error": "invalid_grant", 136 "error_description": ( 137 "The provided authorization grant or refresh token is invalid, expired, " 138 "revoked, does not match the redirection URI used in the authorization " 139 "request, or was issued to another client" 140 ), 141 "request_id": response.headers["X-authentik-id"], 142 }, 143 ) 144 self.assertEqual(response.status_code, 400) 145 146 def test_pkce_correct_s256(self): 147 """Test full with pkce""" 148 flow = create_test_flow() 149 provider = OAuth2Provider.objects.create( 150 name=generate_id(), 151 client_id="test", 152 authorization_flow=flow, 153 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 154 access_code_validity="seconds=100", 155 ) 156 Application.objects.create(name="app", slug="app", provider=provider) 157 state = generate_id() 158 user = create_test_admin_user() 159 self.client.force_login(user) 160 verifier = generate_id() 161 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 162 # Step 1, initiate params and get redirect to flow 163 response = self.client.get( 164 reverse("authentik_providers_oauth2:authorize"), 165 data={ 166 "response_type": "code", 167 "client_id": "test", 168 "state": state, 169 "redirect_uri": "foo://localhost", 170 "code_challenge": pkce_s256_challenge(verifier), 171 "code_challenge_method": "S256", 172 }, 173 ) 174 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 175 self.assertEqual( 176 response.url, 177 f"foo://localhost?code={code.code}&state={state}", 178 ) 179 response = self.client.post( 180 reverse("authentik_providers_oauth2:token"), 181 data={ 182 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 183 "code": code.code, 184 "code_verifier": verifier, 185 "redirect_uri": "foo://localhost", 186 }, 187 HTTP_AUTHORIZATION=f"Basic {header}", 188 ) 189 self.assertEqual(response.status_code, 200) 190 191 def test_pkce_correct_plain(self): 192 """Test full with pkce""" 193 flow = create_test_flow() 194 provider = OAuth2Provider.objects.create( 195 name=generate_id(), 196 client_id="test", 197 authorization_flow=flow, 198 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 199 access_code_validity="seconds=100", 200 ) 201 Application.objects.create(name="app", slug="app", provider=provider) 202 state = generate_id() 203 user = create_test_admin_user() 204 self.client.force_login(user) 205 verifier = generate_id() 206 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 207 # Step 1, initiate params and get redirect to flow 208 response = self.client.get( 209 reverse("authentik_providers_oauth2:authorize"), 210 data={ 211 "response_type": "code", 212 "client_id": "test", 213 "state": state, 214 "redirect_uri": "foo://localhost", 215 "code_challenge": verifier, 216 }, 217 ) 218 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 219 self.assertEqual( 220 response.url, 221 f"foo://localhost?code={code.code}&state={state}", 222 ) 223 response = self.client.post( 224 reverse("authentik_providers_oauth2:token"), 225 data={ 226 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 227 "code": code.code, 228 "code_verifier": verifier, 229 "redirect_uri": "foo://localhost", 230 }, 231 HTTP_AUTHORIZATION=f"Basic {header}", 232 ) 233 self.assertEqual(response.status_code, 200)
Test token view
def
setUp(self) -> None:
26 def setUp(self) -> None: 27 super().setUp() 28 self.factory = RequestFactory() 29 self.app = Application.objects.create(name=generate_id(), slug="test")
Hook method for setting up the test fixture before exercising it.
def
test_pkce_missing_in_token(self):
89 def test_pkce_missing_in_token(self): 90 """Test PKCE with missing code_challenge in authorization request but verifier 91 set in token request""" 92 flow = create_test_flow() 93 provider = OAuth2Provider.objects.create( 94 name=generate_id(), 95 client_id="test", 96 authorization_flow=flow, 97 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 98 access_code_validity="seconds=100", 99 ) 100 Application.objects.create(name="app", slug="app", provider=provider) 101 state = generate_id() 102 user = create_test_admin_user() 103 self.client.force_login(user) 104 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 105 # Step 1, initiate params and get redirect to flow 106 response = self.client.get( 107 reverse("authentik_providers_oauth2:authorize"), 108 data={ 109 "response_type": "code", 110 "client_id": "test", 111 "state": state, 112 "redirect_uri": "foo://localhost", 113 # "code_challenge": challenge, 114 # "code_challenge_method": "S256", 115 }, 116 ) 117 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 118 self.assertEqual( 119 response.url, 120 f"foo://localhost?code={code.code}&state={state}", 121 ) 122 response = self.client.post( 123 reverse("authentik_providers_oauth2:token"), 124 data={ 125 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 126 "code": code.code, 127 "code_verifier": generate_id(), 128 "redirect_uri": "foo://localhost", 129 }, 130 HTTP_AUTHORIZATION=f"Basic {header}", 131 ) 132 self.assertJSONEqual( 133 response.content, 134 { 135 "error": "invalid_grant", 136 "error_description": ( 137 "The provided authorization grant or refresh token is invalid, expired, " 138 "revoked, does not match the redirection URI used in the authorization " 139 "request, or was issued to another client" 140 ), 141 "request_id": response.headers["X-authentik-id"], 142 }, 143 ) 144 self.assertEqual(response.status_code, 400)
Test PKCE with missing code_challenge in authorization request but verifier set in token request
def
test_pkce_correct_s256(self):
146 def test_pkce_correct_s256(self): 147 """Test full with pkce""" 148 flow = create_test_flow() 149 provider = OAuth2Provider.objects.create( 150 name=generate_id(), 151 client_id="test", 152 authorization_flow=flow, 153 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 154 access_code_validity="seconds=100", 155 ) 156 Application.objects.create(name="app", slug="app", provider=provider) 157 state = generate_id() 158 user = create_test_admin_user() 159 self.client.force_login(user) 160 verifier = generate_id() 161 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 162 # Step 1, initiate params and get redirect to flow 163 response = self.client.get( 164 reverse("authentik_providers_oauth2:authorize"), 165 data={ 166 "response_type": "code", 167 "client_id": "test", 168 "state": state, 169 "redirect_uri": "foo://localhost", 170 "code_challenge": pkce_s256_challenge(verifier), 171 "code_challenge_method": "S256", 172 }, 173 ) 174 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 175 self.assertEqual( 176 response.url, 177 f"foo://localhost?code={code.code}&state={state}", 178 ) 179 response = self.client.post( 180 reverse("authentik_providers_oauth2:token"), 181 data={ 182 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 183 "code": code.code, 184 "code_verifier": verifier, 185 "redirect_uri": "foo://localhost", 186 }, 187 HTTP_AUTHORIZATION=f"Basic {header}", 188 ) 189 self.assertEqual(response.status_code, 200)
Test full with pkce
def
test_pkce_correct_plain(self):
191 def test_pkce_correct_plain(self): 192 """Test full with pkce""" 193 flow = create_test_flow() 194 provider = OAuth2Provider.objects.create( 195 name=generate_id(), 196 client_id="test", 197 authorization_flow=flow, 198 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 199 access_code_validity="seconds=100", 200 ) 201 Application.objects.create(name="app", slug="app", provider=provider) 202 state = generate_id() 203 user = create_test_admin_user() 204 self.client.force_login(user) 205 verifier = generate_id() 206 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 207 # Step 1, initiate params and get redirect to flow 208 response = self.client.get( 209 reverse("authentik_providers_oauth2:authorize"), 210 data={ 211 "response_type": "code", 212 "client_id": "test", 213 "state": state, 214 "redirect_uri": "foo://localhost", 215 "code_challenge": verifier, 216 }, 217 ) 218 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 219 self.assertEqual( 220 response.url, 221 f"foo://localhost?code={code.code}&state={state}", 222 ) 223 response = self.client.post( 224 reverse("authentik_providers_oauth2:token"), 225 data={ 226 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 227 "code": code.code, 228 "code_verifier": verifier, 229 "redirect_uri": "foo://localhost", 230 }, 231 HTTP_AUTHORIZATION=f"Basic {header}", 232 ) 233 self.assertEqual(response.status_code, 200)
Test full with pkce