authentik.providers.oauth2.tests.test_token_pkce
Test token view
1"""Test token view""" 2 3from base64 import b64encode 4 5from django.test import RequestFactory 6from django.urls import reverse 7 8from authentik.common.oauth.constants import GRANT_TYPE_AUTHORIZATION_CODE 9from authentik.core.models import Application 10from authentik.core.tests.utils import create_test_admin_user, create_test_flow 11from authentik.lib.generators import generate_id 12from authentik.providers.oauth2.models import ( 13 AuthorizationCode, 14 GrantType, 15 OAuth2Provider, 16 RedirectURI, 17 RedirectURIMatchingMode, 18) 19from authentik.providers.oauth2.tests.utils import OAuthTestCase 20from authentik.providers.oauth2.utils import pkce_s256_challenge 21 22 23class TestTokenPKCE(OAuthTestCase): 24 """Test token view""" 25 26 def setUp(self) -> None: 27 super().setUp() 28 self.factory = RequestFactory() 29 self.app = Application.objects.create(name=generate_id(), slug="test") 30 31 def test_pkce_missing_in_authorize(self): 32 """Test PKCE with code_challenge in authorize request 33 and missing verifier in token request""" 34 flow = create_test_flow() 35 provider = OAuth2Provider.objects.create( 36 name=generate_id(), 37 client_id="test", 38 authorization_flow=flow, 39 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 40 access_code_validity="seconds=100", 41 grant_types=[GrantType.AUTHORIZATION_CODE], 42 ) 43 Application.objects.create(name="app", slug="app", provider=provider) 44 state = generate_id() 45 user = create_test_admin_user() 46 self.client.force_login(user) 47 challenge = generate_id() 48 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 49 # Step 1, initiate params and get redirect to flow 50 response = self.client.get( 51 reverse("authentik_providers_oauth2:authorize"), 52 data={ 53 "response_type": "code", 54 "client_id": "test", 55 "state": state, 56 "redirect_uri": "foo://localhost", 57 "code_challenge": challenge, 58 "code_challenge_method": "S256", 59 }, 60 ) 61 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 62 self.assertEqual( 63 response.url, 64 f"foo://localhost?code={code.code}&state={state}", 65 ) 66 response = self.client.post( 67 reverse("authentik_providers_oauth2:token"), 68 data={ 69 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 70 "code": code.code, 71 # Missing the code_verifier here 72 "redirect_uri": "foo://localhost", 73 }, 74 HTTP_AUTHORIZATION=f"Basic {header}", 75 ) 76 self.assertJSONEqual( 77 response.content, 78 { 79 "error": "invalid_grant", 80 "error_description": ( 81 "The provided authorization grant or refresh token is invalid, expired, " 82 "revoked, does not match the redirection URI used in the authorization " 83 "request, or was issued to another client" 84 ), 85 "request_id": response.headers["X-authentik-id"], 86 }, 87 ) 88 self.assertEqual(response.status_code, 400) 89 90 def test_pkce_missing_in_token(self): 91 """Test PKCE with missing code_challenge in authorization request but verifier 92 set in token request""" 93 flow = create_test_flow() 94 provider = OAuth2Provider.objects.create( 95 name=generate_id(), 96 client_id="test", 97 authorization_flow=flow, 98 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 99 access_code_validity="seconds=100", 100 grant_types=[GrantType.AUTHORIZATION_CODE], 101 ) 102 Application.objects.create(name="app", slug="app", provider=provider) 103 state = generate_id() 104 user = create_test_admin_user() 105 self.client.force_login(user) 106 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 107 # Step 1, initiate params and get redirect to flow 108 response = self.client.get( 109 reverse("authentik_providers_oauth2:authorize"), 110 data={ 111 "response_type": "code", 112 "client_id": "test", 113 "state": state, 114 "redirect_uri": "foo://localhost", 115 # "code_challenge": challenge, 116 # "code_challenge_method": "S256", 117 }, 118 ) 119 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 120 self.assertEqual( 121 response.url, 122 f"foo://localhost?code={code.code}&state={state}", 123 ) 124 response = self.client.post( 125 reverse("authentik_providers_oauth2:token"), 126 data={ 127 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 128 "code": code.code, 129 "code_verifier": generate_id(), 130 "redirect_uri": "foo://localhost", 131 }, 132 HTTP_AUTHORIZATION=f"Basic {header}", 133 ) 134 self.assertJSONEqual( 135 response.content, 136 { 137 "error": "invalid_grant", 138 "error_description": ( 139 "The provided authorization grant or refresh token is invalid, expired, " 140 "revoked, does not match the redirection URI used in the authorization " 141 "request, or was issued to another client" 142 ), 143 "request_id": response.headers["X-authentik-id"], 144 }, 145 ) 146 self.assertEqual(response.status_code, 400) 147 148 def test_pkce_correct_s256(self): 149 """Test full with pkce""" 150 flow = create_test_flow() 151 provider = OAuth2Provider.objects.create( 152 name=generate_id(), 153 client_id="test", 154 authorization_flow=flow, 155 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 156 access_code_validity="seconds=100", 157 grant_types=[GrantType.AUTHORIZATION_CODE], 158 ) 159 Application.objects.create(name="app", slug="app", provider=provider) 160 state = generate_id() 161 user = create_test_admin_user() 162 self.client.force_login(user) 163 verifier = generate_id() 164 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 165 # Step 1, initiate params and get redirect to flow 166 response = self.client.get( 167 reverse("authentik_providers_oauth2:authorize"), 168 data={ 169 "response_type": "code", 170 "client_id": "test", 171 "state": state, 172 "redirect_uri": "foo://localhost", 173 "code_challenge": pkce_s256_challenge(verifier), 174 "code_challenge_method": "S256", 175 }, 176 ) 177 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 178 self.assertEqual( 179 response.url, 180 f"foo://localhost?code={code.code}&state={state}", 181 ) 182 response = self.client.post( 183 reverse("authentik_providers_oauth2:token"), 184 data={ 185 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 186 "code": code.code, 187 "code_verifier": verifier, 188 "redirect_uri": "foo://localhost", 189 }, 190 HTTP_AUTHORIZATION=f"Basic {header}", 191 ) 192 self.assertEqual(response.status_code, 200) 193 194 def test_pkce_correct_plain(self): 195 """Test full with pkce""" 196 flow = create_test_flow() 197 provider = OAuth2Provider.objects.create( 198 name=generate_id(), 199 client_id="test", 200 authorization_flow=flow, 201 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 202 access_code_validity="seconds=100", 203 grant_types=[GrantType.AUTHORIZATION_CODE], 204 ) 205 Application.objects.create(name="app", slug="app", provider=provider) 206 state = generate_id() 207 user = create_test_admin_user() 208 self.client.force_login(user) 209 verifier = generate_id() 210 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 211 # Step 1, initiate params and get redirect to flow 212 response = self.client.get( 213 reverse("authentik_providers_oauth2:authorize"), 214 data={ 215 "response_type": "code", 216 "client_id": "test", 217 "state": state, 218 "redirect_uri": "foo://localhost", 219 "code_challenge": verifier, 220 }, 221 ) 222 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 223 self.assertEqual( 224 response.url, 225 f"foo://localhost?code={code.code}&state={state}", 226 ) 227 response = self.client.post( 228 reverse("authentik_providers_oauth2:token"), 229 data={ 230 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 231 "code": code.code, 232 "code_verifier": verifier, 233 "redirect_uri": "foo://localhost", 234 }, 235 HTTP_AUTHORIZATION=f"Basic {header}", 236 ) 237 self.assertEqual(response.status_code, 200)
24class TestTokenPKCE(OAuthTestCase): 25 """Test token view""" 26 27 def setUp(self) -> None: 28 super().setUp() 29 self.factory = RequestFactory() 30 self.app = Application.objects.create(name=generate_id(), slug="test") 31 32 def test_pkce_missing_in_authorize(self): 33 """Test PKCE with code_challenge in authorize request 34 and missing verifier in token request""" 35 flow = create_test_flow() 36 provider = OAuth2Provider.objects.create( 37 name=generate_id(), 38 client_id="test", 39 authorization_flow=flow, 40 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 41 access_code_validity="seconds=100", 42 grant_types=[GrantType.AUTHORIZATION_CODE], 43 ) 44 Application.objects.create(name="app", slug="app", provider=provider) 45 state = generate_id() 46 user = create_test_admin_user() 47 self.client.force_login(user) 48 challenge = generate_id() 49 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 50 # Step 1, initiate params and get redirect to flow 51 response = self.client.get( 52 reverse("authentik_providers_oauth2:authorize"), 53 data={ 54 "response_type": "code", 55 "client_id": "test", 56 "state": state, 57 "redirect_uri": "foo://localhost", 58 "code_challenge": challenge, 59 "code_challenge_method": "S256", 60 }, 61 ) 62 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 63 self.assertEqual( 64 response.url, 65 f"foo://localhost?code={code.code}&state={state}", 66 ) 67 response = self.client.post( 68 reverse("authentik_providers_oauth2:token"), 69 data={ 70 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 71 "code": code.code, 72 # Missing the code_verifier here 73 "redirect_uri": "foo://localhost", 74 }, 75 HTTP_AUTHORIZATION=f"Basic {header}", 76 ) 77 self.assertJSONEqual( 78 response.content, 79 { 80 "error": "invalid_grant", 81 "error_description": ( 82 "The provided authorization grant or refresh token is invalid, expired, " 83 "revoked, does not match the redirection URI used in the authorization " 84 "request, or was issued to another client" 85 ), 86 "request_id": response.headers["X-authentik-id"], 87 }, 88 ) 89 self.assertEqual(response.status_code, 400) 90 91 def test_pkce_missing_in_token(self): 92 """Test PKCE with missing code_challenge in authorization request but verifier 93 set in token request""" 94 flow = create_test_flow() 95 provider = OAuth2Provider.objects.create( 96 name=generate_id(), 97 client_id="test", 98 authorization_flow=flow, 99 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 100 access_code_validity="seconds=100", 101 grant_types=[GrantType.AUTHORIZATION_CODE], 102 ) 103 Application.objects.create(name="app", slug="app", provider=provider) 104 state = generate_id() 105 user = create_test_admin_user() 106 self.client.force_login(user) 107 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 108 # Step 1, initiate params and get redirect to flow 109 response = self.client.get( 110 reverse("authentik_providers_oauth2:authorize"), 111 data={ 112 "response_type": "code", 113 "client_id": "test", 114 "state": state, 115 "redirect_uri": "foo://localhost", 116 # "code_challenge": challenge, 117 # "code_challenge_method": "S256", 118 }, 119 ) 120 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 121 self.assertEqual( 122 response.url, 123 f"foo://localhost?code={code.code}&state={state}", 124 ) 125 response = self.client.post( 126 reverse("authentik_providers_oauth2:token"), 127 data={ 128 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 129 "code": code.code, 130 "code_verifier": generate_id(), 131 "redirect_uri": "foo://localhost", 132 }, 133 HTTP_AUTHORIZATION=f"Basic {header}", 134 ) 135 self.assertJSONEqual( 136 response.content, 137 { 138 "error": "invalid_grant", 139 "error_description": ( 140 "The provided authorization grant or refresh token is invalid, expired, " 141 "revoked, does not match the redirection URI used in the authorization " 142 "request, or was issued to another client" 143 ), 144 "request_id": response.headers["X-authentik-id"], 145 }, 146 ) 147 self.assertEqual(response.status_code, 400) 148 149 def test_pkce_correct_s256(self): 150 """Test full with pkce""" 151 flow = create_test_flow() 152 provider = OAuth2Provider.objects.create( 153 name=generate_id(), 154 client_id="test", 155 authorization_flow=flow, 156 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 157 access_code_validity="seconds=100", 158 grant_types=[GrantType.AUTHORIZATION_CODE], 159 ) 160 Application.objects.create(name="app", slug="app", provider=provider) 161 state = generate_id() 162 user = create_test_admin_user() 163 self.client.force_login(user) 164 verifier = generate_id() 165 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 166 # Step 1, initiate params and get redirect to flow 167 response = self.client.get( 168 reverse("authentik_providers_oauth2:authorize"), 169 data={ 170 "response_type": "code", 171 "client_id": "test", 172 "state": state, 173 "redirect_uri": "foo://localhost", 174 "code_challenge": pkce_s256_challenge(verifier), 175 "code_challenge_method": "S256", 176 }, 177 ) 178 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 179 self.assertEqual( 180 response.url, 181 f"foo://localhost?code={code.code}&state={state}", 182 ) 183 response = self.client.post( 184 reverse("authentik_providers_oauth2:token"), 185 data={ 186 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 187 "code": code.code, 188 "code_verifier": verifier, 189 "redirect_uri": "foo://localhost", 190 }, 191 HTTP_AUTHORIZATION=f"Basic {header}", 192 ) 193 self.assertEqual(response.status_code, 200) 194 195 def test_pkce_correct_plain(self): 196 """Test full with pkce""" 197 flow = create_test_flow() 198 provider = OAuth2Provider.objects.create( 199 name=generate_id(), 200 client_id="test", 201 authorization_flow=flow, 202 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 203 access_code_validity="seconds=100", 204 grant_types=[GrantType.AUTHORIZATION_CODE], 205 ) 206 Application.objects.create(name="app", slug="app", provider=provider) 207 state = generate_id() 208 user = create_test_admin_user() 209 self.client.force_login(user) 210 verifier = generate_id() 211 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 212 # Step 1, initiate params and get redirect to flow 213 response = self.client.get( 214 reverse("authentik_providers_oauth2:authorize"), 215 data={ 216 "response_type": "code", 217 "client_id": "test", 218 "state": state, 219 "redirect_uri": "foo://localhost", 220 "code_challenge": verifier, 221 }, 222 ) 223 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 224 self.assertEqual( 225 response.url, 226 f"foo://localhost?code={code.code}&state={state}", 227 ) 228 response = self.client.post( 229 reverse("authentik_providers_oauth2:token"), 230 data={ 231 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 232 "code": code.code, 233 "code_verifier": verifier, 234 "redirect_uri": "foo://localhost", 235 }, 236 HTTP_AUTHORIZATION=f"Basic {header}", 237 ) 238 self.assertEqual(response.status_code, 200)
Test token view
def
setUp(self) -> None:
27 def setUp(self) -> None: 28 super().setUp() 29 self.factory = RequestFactory() 30 self.app = Application.objects.create(name=generate_id(), slug="test")
Hook method for setting up the test fixture before exercising it.
def
test_pkce_missing_in_token(self):
91 def test_pkce_missing_in_token(self): 92 """Test PKCE with missing code_challenge in authorization request but verifier 93 set in token request""" 94 flow = create_test_flow() 95 provider = OAuth2Provider.objects.create( 96 name=generate_id(), 97 client_id="test", 98 authorization_flow=flow, 99 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 100 access_code_validity="seconds=100", 101 grant_types=[GrantType.AUTHORIZATION_CODE], 102 ) 103 Application.objects.create(name="app", slug="app", provider=provider) 104 state = generate_id() 105 user = create_test_admin_user() 106 self.client.force_login(user) 107 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 108 # Step 1, initiate params and get redirect to flow 109 response = self.client.get( 110 reverse("authentik_providers_oauth2:authorize"), 111 data={ 112 "response_type": "code", 113 "client_id": "test", 114 "state": state, 115 "redirect_uri": "foo://localhost", 116 # "code_challenge": challenge, 117 # "code_challenge_method": "S256", 118 }, 119 ) 120 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 121 self.assertEqual( 122 response.url, 123 f"foo://localhost?code={code.code}&state={state}", 124 ) 125 response = self.client.post( 126 reverse("authentik_providers_oauth2:token"), 127 data={ 128 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 129 "code": code.code, 130 "code_verifier": generate_id(), 131 "redirect_uri": "foo://localhost", 132 }, 133 HTTP_AUTHORIZATION=f"Basic {header}", 134 ) 135 self.assertJSONEqual( 136 response.content, 137 { 138 "error": "invalid_grant", 139 "error_description": ( 140 "The provided authorization grant or refresh token is invalid, expired, " 141 "revoked, does not match the redirection URI used in the authorization " 142 "request, or was issued to another client" 143 ), 144 "request_id": response.headers["X-authentik-id"], 145 }, 146 ) 147 self.assertEqual(response.status_code, 400)
Test PKCE with missing code_challenge in authorization request but verifier set in token request
def
test_pkce_correct_s256(self):
149 def test_pkce_correct_s256(self): 150 """Test full with pkce""" 151 flow = create_test_flow() 152 provider = OAuth2Provider.objects.create( 153 name=generate_id(), 154 client_id="test", 155 authorization_flow=flow, 156 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 157 access_code_validity="seconds=100", 158 grant_types=[GrantType.AUTHORIZATION_CODE], 159 ) 160 Application.objects.create(name="app", slug="app", provider=provider) 161 state = generate_id() 162 user = create_test_admin_user() 163 self.client.force_login(user) 164 verifier = generate_id() 165 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 166 # Step 1, initiate params and get redirect to flow 167 response = self.client.get( 168 reverse("authentik_providers_oauth2:authorize"), 169 data={ 170 "response_type": "code", 171 "client_id": "test", 172 "state": state, 173 "redirect_uri": "foo://localhost", 174 "code_challenge": pkce_s256_challenge(verifier), 175 "code_challenge_method": "S256", 176 }, 177 ) 178 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 179 self.assertEqual( 180 response.url, 181 f"foo://localhost?code={code.code}&state={state}", 182 ) 183 response = self.client.post( 184 reverse("authentik_providers_oauth2:token"), 185 data={ 186 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 187 "code": code.code, 188 "code_verifier": verifier, 189 "redirect_uri": "foo://localhost", 190 }, 191 HTTP_AUTHORIZATION=f"Basic {header}", 192 ) 193 self.assertEqual(response.status_code, 200)
Test full with pkce
def
test_pkce_correct_plain(self):
195 def test_pkce_correct_plain(self): 196 """Test full with pkce""" 197 flow = create_test_flow() 198 provider = OAuth2Provider.objects.create( 199 name=generate_id(), 200 client_id="test", 201 authorization_flow=flow, 202 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "foo://localhost")], 203 access_code_validity="seconds=100", 204 grant_types=[GrantType.AUTHORIZATION_CODE], 205 ) 206 Application.objects.create(name="app", slug="app", provider=provider) 207 state = generate_id() 208 user = create_test_admin_user() 209 self.client.force_login(user) 210 verifier = generate_id() 211 header = b64encode(f"{provider.client_id}:{provider.client_secret}".encode()).decode() 212 # Step 1, initiate params and get redirect to flow 213 response = self.client.get( 214 reverse("authentik_providers_oauth2:authorize"), 215 data={ 216 "response_type": "code", 217 "client_id": "test", 218 "state": state, 219 "redirect_uri": "foo://localhost", 220 "code_challenge": verifier, 221 }, 222 ) 223 code: AuthorizationCode = AuthorizationCode.objects.filter(user=user).first() 224 self.assertEqual( 225 response.url, 226 f"foo://localhost?code={code.code}&state={state}", 227 ) 228 response = self.client.post( 229 reverse("authentik_providers_oauth2:token"), 230 data={ 231 "grant_type": GRANT_TYPE_AUTHORIZATION_CODE, 232 "code": code.code, 233 "code_verifier": verifier, 234 "redirect_uri": "foo://localhost", 235 }, 236 HTTP_AUTHORIZATION=f"Basic {header}", 237 ) 238 self.assertEqual(response.status_code, 200)
Test full with pkce