authentik.sources.oauth.tests.test_views
OAuth Source tests
1"""OAuth Source tests""" 2 3from urllib.parse import parse_qs 4 5from django.urls import reverse 6from requests_mock import Mocker 7from rest_framework.test import APITestCase 8 9from authentik.core.models import User 10from authentik.core.tests.utils import create_test_admin_user 11from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan 12from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER 13from authentik.flows.views.executor import SESSION_KEY_PLAN 14from authentik.lib.generators import generate_id 15from authentik.providers.oauth2.utils import pkce_s256_challenge 16from authentik.sources.oauth.api.source import OAuthSourceSerializer 17from authentik.sources.oauth.clients.oauth2 import SESSION_KEY_OAUTH_PKCE 18from authentik.sources.oauth.models import OAuthSource, PKCEMethod 19 20 21class TestOAuthSource(APITestCase): 22 """OAuth Source tests""" 23 24 def setUp(self): 25 self.source = OAuthSource.objects.create( 26 name="test", 27 slug="test", 28 provider_type="openidconnect", 29 authorization_url="", 30 profile_url="", 31 consumer_key="", 32 ) 33 34 def test_api_read(self): 35 """Test reading a source""" 36 self.client.force_login(create_test_admin_user()) 37 response = self.client.get( 38 reverse( 39 "authentik_api:oauthsource-detail", 40 kwargs={ 41 "slug": self.source.slug, 42 }, 43 ) 44 ) 45 self.assertEqual(response.status_code, 200) 46 47 def test_api_validate(self): 48 """Test API validation""" 49 self.assertTrue( 50 OAuthSourceSerializer( 51 data={ 52 "name": "foo", 53 "slug": "bar", 54 "provider_type": "google", 55 "consumer_key": "foo", 56 "consumer_secret": "foo", 57 "oidc_well_known_url": "", 58 "oidc_jwks_url": "", 59 } 60 ).is_valid() 61 ) 62 self.assertFalse( 63 OAuthSourceSerializer( 64 data={ 65 "name": "foo", 66 "slug": "bar", 67 "provider_type": "openidconnect", 68 "consumer_key": "foo", 69 "consumer_secret": "foo", 70 } 71 ).is_valid() 72 ) 73 74 def test_api_validate_openid_connect(self): 75 """Test API validation (with OIDC endpoints)""" 76 openid_config = { 77 "issuer": "foo", 78 "authorization_endpoint": "http://mock/oauth/authorize", 79 "token_endpoint": "http://mock/oauth/token", 80 "userinfo_endpoint": "http://mock/oauth/userinfo", 81 "jwks_uri": "http://mock/oauth/discovery/keys", 82 "code_challenge_methods_supported": ["S256"], 83 } 84 jwks_config = {"keys": []} 85 with Mocker() as mocker: 86 url = "http://mock/.well-known/openid-configuration" 87 mocker.get(url, json=openid_config) 88 mocker.get(openid_config["jwks_uri"], json=jwks_config) 89 serializer = OAuthSourceSerializer( 90 instance=self.source, 91 data={ 92 "name": "foo", 93 "slug": "bar", 94 "provider_type": "openidconnect", 95 "consumer_key": "foo", 96 "consumer_secret": "foo", 97 "oidc_well_known_url": url, 98 "oidc_jwks_url": "", 99 }, 100 ) 101 self.assertTrue(serializer.is_valid()) 102 self.assertEqual( 103 serializer.validated_data["authorization_url"], "http://mock/oauth/authorize" 104 ) 105 self.assertEqual( 106 serializer.validated_data["access_token_url"], "http://mock/oauth/token" 107 ) 108 self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo") 109 self.assertEqual( 110 serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys" 111 ) 112 self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config) 113 self.assertEqual(serializer.validated_data["pkce"], PKCEMethod.S256) 114 115 def test_api_validate_openid_connect_invalid(self): 116 """Test API validation (with OIDC endpoints)""" 117 openid_config = {} 118 with Mocker() as mocker: 119 url = "http://mock/.well-known/openid-configuration" 120 mocker.get(url, json=openid_config) 121 serializer = OAuthSourceSerializer( 122 instance=self.source, 123 data={ 124 "name": "foo", 125 "slug": "bar", 126 "provider_type": "openidconnect", 127 "consumer_key": "foo", 128 "consumer_secret": "foo", 129 "authorization_url": "http://foo", 130 "access_token_url": "http://foo", 131 "profile_url": "http://foo", 132 "oidc_well_known_url": url, 133 "oidc_jwks_url": "", 134 }, 135 ) 136 self.assertFalse(serializer.is_valid()) 137 138 def test_source_redirect_login_hint_user(self): 139 """test redirect view with login hint""" 140 user = User(email="foo@authentik.company") 141 session = self.client.session 142 plan = FlowPlan(generate_id()) 143 plan.context[PLAN_CONTEXT_PENDING_USER] = user 144 session[SESSION_KEY_PLAN] = plan 145 session.save() 146 147 res = self.client.get( 148 reverse( 149 "authentik_sources_oauth:oauth-client-login", 150 kwargs={"source_slug": self.source.slug}, 151 ) 152 ) 153 self.assertEqual(res.status_code, 302) 154 qs = parse_qs(res.url) 155 self.assertEqual(qs["login_hint"], ["foo@authentik.company"]) 156 157 def test_source_redirect_login_hint_user_identifier(self): 158 """test redirect view with login hint""" 159 session = self.client.session 160 plan = FlowPlan(generate_id()) 161 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company" 162 session[SESSION_KEY_PLAN] = plan 163 session.save() 164 165 res = self.client.get( 166 reverse( 167 "authentik_sources_oauth:oauth-client-login", 168 kwargs={"source_slug": self.source.slug}, 169 ) 170 ) 171 self.assertEqual(res.status_code, 302) 172 qs = parse_qs(res.url) 173 self.assertEqual(qs["login_hint"], ["foo@authentik.company"]) 174 175 def test_source_redirect(self): 176 """test redirect view""" 177 res = self.client.get( 178 reverse( 179 "authentik_sources_oauth:oauth-client-login", 180 kwargs={"source_slug": self.source.slug}, 181 ) 182 ) 183 self.assertEqual(res.status_code, 302) 184 qs = parse_qs(res.url) 185 186 session = self.client.session 187 state = session[f"oauth-client-{self.source.name}-request-state"] 188 189 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 190 self.assertEqual(qs["response_type"], ["code"]) 191 self.assertEqual(qs["state"], [state]) 192 self.assertEqual(qs["scope"], ["email openid profile"]) 193 194 def test_source_redirect_pkce(self): 195 """test redirect view""" 196 self.source.pkce = PKCEMethod.S256 197 self.source.save() 198 res = self.client.get( 199 reverse( 200 "authentik_sources_oauth:oauth-client-login", 201 kwargs={"source_slug": self.source.slug}, 202 ) 203 ) 204 self.assertEqual(res.status_code, 302) 205 qs = parse_qs(res.url) 206 207 session = self.client.session 208 state = session[f"oauth-client-{self.source.name}-request-state"] 209 verifier = session[SESSION_KEY_OAUTH_PKCE] 210 self.assertEqual(len(verifier), 128) 211 challenge = pkce_s256_challenge(verifier) 212 213 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 214 self.assertEqual(qs["response_type"], ["code"]) 215 self.assertEqual(qs["state"], [state]) 216 self.assertEqual(qs["scope"], ["email openid profile"]) 217 self.assertEqual(qs["code_challenge"], [challenge]) 218 self.assertEqual(qs["code_challenge_method"], ["S256"]) 219 220 def test_source_callback(self): 221 """test callback view""" 222 res = self.client.get( 223 reverse( 224 "authentik_sources_oauth:oauth-client-callback", 225 kwargs={"source_slug": self.source.slug}, 226 ) 227 ) 228 self.assertEqual(res.status_code, 302)
class
TestOAuthSource(rest_framework.test.APITestCase):
22class TestOAuthSource(APITestCase): 23 """OAuth Source tests""" 24 25 def setUp(self): 26 self.source = OAuthSource.objects.create( 27 name="test", 28 slug="test", 29 provider_type="openidconnect", 30 authorization_url="", 31 profile_url="", 32 consumer_key="", 33 ) 34 35 def test_api_read(self): 36 """Test reading a source""" 37 self.client.force_login(create_test_admin_user()) 38 response = self.client.get( 39 reverse( 40 "authentik_api:oauthsource-detail", 41 kwargs={ 42 "slug": self.source.slug, 43 }, 44 ) 45 ) 46 self.assertEqual(response.status_code, 200) 47 48 def test_api_validate(self): 49 """Test API validation""" 50 self.assertTrue( 51 OAuthSourceSerializer( 52 data={ 53 "name": "foo", 54 "slug": "bar", 55 "provider_type": "google", 56 "consumer_key": "foo", 57 "consumer_secret": "foo", 58 "oidc_well_known_url": "", 59 "oidc_jwks_url": "", 60 } 61 ).is_valid() 62 ) 63 self.assertFalse( 64 OAuthSourceSerializer( 65 data={ 66 "name": "foo", 67 "slug": "bar", 68 "provider_type": "openidconnect", 69 "consumer_key": "foo", 70 "consumer_secret": "foo", 71 } 72 ).is_valid() 73 ) 74 75 def test_api_validate_openid_connect(self): 76 """Test API validation (with OIDC endpoints)""" 77 openid_config = { 78 "issuer": "foo", 79 "authorization_endpoint": "http://mock/oauth/authorize", 80 "token_endpoint": "http://mock/oauth/token", 81 "userinfo_endpoint": "http://mock/oauth/userinfo", 82 "jwks_uri": "http://mock/oauth/discovery/keys", 83 "code_challenge_methods_supported": ["S256"], 84 } 85 jwks_config = {"keys": []} 86 with Mocker() as mocker: 87 url = "http://mock/.well-known/openid-configuration" 88 mocker.get(url, json=openid_config) 89 mocker.get(openid_config["jwks_uri"], json=jwks_config) 90 serializer = OAuthSourceSerializer( 91 instance=self.source, 92 data={ 93 "name": "foo", 94 "slug": "bar", 95 "provider_type": "openidconnect", 96 "consumer_key": "foo", 97 "consumer_secret": "foo", 98 "oidc_well_known_url": url, 99 "oidc_jwks_url": "", 100 }, 101 ) 102 self.assertTrue(serializer.is_valid()) 103 self.assertEqual( 104 serializer.validated_data["authorization_url"], "http://mock/oauth/authorize" 105 ) 106 self.assertEqual( 107 serializer.validated_data["access_token_url"], "http://mock/oauth/token" 108 ) 109 self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo") 110 self.assertEqual( 111 serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys" 112 ) 113 self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config) 114 self.assertEqual(serializer.validated_data["pkce"], PKCEMethod.S256) 115 116 def test_api_validate_openid_connect_invalid(self): 117 """Test API validation (with OIDC endpoints)""" 118 openid_config = {} 119 with Mocker() as mocker: 120 url = "http://mock/.well-known/openid-configuration" 121 mocker.get(url, json=openid_config) 122 serializer = OAuthSourceSerializer( 123 instance=self.source, 124 data={ 125 "name": "foo", 126 "slug": "bar", 127 "provider_type": "openidconnect", 128 "consumer_key": "foo", 129 "consumer_secret": "foo", 130 "authorization_url": "http://foo", 131 "access_token_url": "http://foo", 132 "profile_url": "http://foo", 133 "oidc_well_known_url": url, 134 "oidc_jwks_url": "", 135 }, 136 ) 137 self.assertFalse(serializer.is_valid()) 138 139 def test_source_redirect_login_hint_user(self): 140 """test redirect view with login hint""" 141 user = User(email="foo@authentik.company") 142 session = self.client.session 143 plan = FlowPlan(generate_id()) 144 plan.context[PLAN_CONTEXT_PENDING_USER] = user 145 session[SESSION_KEY_PLAN] = plan 146 session.save() 147 148 res = self.client.get( 149 reverse( 150 "authentik_sources_oauth:oauth-client-login", 151 kwargs={"source_slug": self.source.slug}, 152 ) 153 ) 154 self.assertEqual(res.status_code, 302) 155 qs = parse_qs(res.url) 156 self.assertEqual(qs["login_hint"], ["foo@authentik.company"]) 157 158 def test_source_redirect_login_hint_user_identifier(self): 159 """test redirect view with login hint""" 160 session = self.client.session 161 plan = FlowPlan(generate_id()) 162 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company" 163 session[SESSION_KEY_PLAN] = plan 164 session.save() 165 166 res = self.client.get( 167 reverse( 168 "authentik_sources_oauth:oauth-client-login", 169 kwargs={"source_slug": self.source.slug}, 170 ) 171 ) 172 self.assertEqual(res.status_code, 302) 173 qs = parse_qs(res.url) 174 self.assertEqual(qs["login_hint"], ["foo@authentik.company"]) 175 176 def test_source_redirect(self): 177 """test redirect view""" 178 res = self.client.get( 179 reverse( 180 "authentik_sources_oauth:oauth-client-login", 181 kwargs={"source_slug": self.source.slug}, 182 ) 183 ) 184 self.assertEqual(res.status_code, 302) 185 qs = parse_qs(res.url) 186 187 session = self.client.session 188 state = session[f"oauth-client-{self.source.name}-request-state"] 189 190 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 191 self.assertEqual(qs["response_type"], ["code"]) 192 self.assertEqual(qs["state"], [state]) 193 self.assertEqual(qs["scope"], ["email openid profile"]) 194 195 def test_source_redirect_pkce(self): 196 """test redirect view""" 197 self.source.pkce = PKCEMethod.S256 198 self.source.save() 199 res = self.client.get( 200 reverse( 201 "authentik_sources_oauth:oauth-client-login", 202 kwargs={"source_slug": self.source.slug}, 203 ) 204 ) 205 self.assertEqual(res.status_code, 302) 206 qs = parse_qs(res.url) 207 208 session = self.client.session 209 state = session[f"oauth-client-{self.source.name}-request-state"] 210 verifier = session[SESSION_KEY_OAUTH_PKCE] 211 self.assertEqual(len(verifier), 128) 212 challenge = pkce_s256_challenge(verifier) 213 214 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 215 self.assertEqual(qs["response_type"], ["code"]) 216 self.assertEqual(qs["state"], [state]) 217 self.assertEqual(qs["scope"], ["email openid profile"]) 218 self.assertEqual(qs["code_challenge"], [challenge]) 219 self.assertEqual(qs["code_challenge_method"], ["S256"]) 220 221 def test_source_callback(self): 222 """test callback view""" 223 res = self.client.get( 224 reverse( 225 "authentik_sources_oauth:oauth-client-callback", 226 kwargs={"source_slug": self.source.slug}, 227 ) 228 ) 229 self.assertEqual(res.status_code, 302)
OAuth Source tests
def
setUp(self):
25 def setUp(self): 26 self.source = OAuthSource.objects.create( 27 name="test", 28 slug="test", 29 provider_type="openidconnect", 30 authorization_url="", 31 profile_url="", 32 consumer_key="", 33 )
Hook method for setting up the test fixture before exercising it.
def
test_api_read(self):
35 def test_api_read(self): 36 """Test reading a source""" 37 self.client.force_login(create_test_admin_user()) 38 response = self.client.get( 39 reverse( 40 "authentik_api:oauthsource-detail", 41 kwargs={ 42 "slug": self.source.slug, 43 }, 44 ) 45 ) 46 self.assertEqual(response.status_code, 200)
Test reading a source
def
test_api_validate(self):
48 def test_api_validate(self): 49 """Test API validation""" 50 self.assertTrue( 51 OAuthSourceSerializer( 52 data={ 53 "name": "foo", 54 "slug": "bar", 55 "provider_type": "google", 56 "consumer_key": "foo", 57 "consumer_secret": "foo", 58 "oidc_well_known_url": "", 59 "oidc_jwks_url": "", 60 } 61 ).is_valid() 62 ) 63 self.assertFalse( 64 OAuthSourceSerializer( 65 data={ 66 "name": "foo", 67 "slug": "bar", 68 "provider_type": "openidconnect", 69 "consumer_key": "foo", 70 "consumer_secret": "foo", 71 } 72 ).is_valid() 73 )
Test API validation
def
test_api_validate_openid_connect(self):
75 def test_api_validate_openid_connect(self): 76 """Test API validation (with OIDC endpoints)""" 77 openid_config = { 78 "issuer": "foo", 79 "authorization_endpoint": "http://mock/oauth/authorize", 80 "token_endpoint": "http://mock/oauth/token", 81 "userinfo_endpoint": "http://mock/oauth/userinfo", 82 "jwks_uri": "http://mock/oauth/discovery/keys", 83 "code_challenge_methods_supported": ["S256"], 84 } 85 jwks_config = {"keys": []} 86 with Mocker() as mocker: 87 url = "http://mock/.well-known/openid-configuration" 88 mocker.get(url, json=openid_config) 89 mocker.get(openid_config["jwks_uri"], json=jwks_config) 90 serializer = OAuthSourceSerializer( 91 instance=self.source, 92 data={ 93 "name": "foo", 94 "slug": "bar", 95 "provider_type": "openidconnect", 96 "consumer_key": "foo", 97 "consumer_secret": "foo", 98 "oidc_well_known_url": url, 99 "oidc_jwks_url": "", 100 }, 101 ) 102 self.assertTrue(serializer.is_valid()) 103 self.assertEqual( 104 serializer.validated_data["authorization_url"], "http://mock/oauth/authorize" 105 ) 106 self.assertEqual( 107 serializer.validated_data["access_token_url"], "http://mock/oauth/token" 108 ) 109 self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo") 110 self.assertEqual( 111 serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys" 112 ) 113 self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config) 114 self.assertEqual(serializer.validated_data["pkce"], PKCEMethod.S256)
Test API validation (with OIDC endpoints)
def
test_api_validate_openid_connect_invalid(self):
116 def test_api_validate_openid_connect_invalid(self): 117 """Test API validation (with OIDC endpoints)""" 118 openid_config = {} 119 with Mocker() as mocker: 120 url = "http://mock/.well-known/openid-configuration" 121 mocker.get(url, json=openid_config) 122 serializer = OAuthSourceSerializer( 123 instance=self.source, 124 data={ 125 "name": "foo", 126 "slug": "bar", 127 "provider_type": "openidconnect", 128 "consumer_key": "foo", 129 "consumer_secret": "foo", 130 "authorization_url": "http://foo", 131 "access_token_url": "http://foo", 132 "profile_url": "http://foo", 133 "oidc_well_known_url": url, 134 "oidc_jwks_url": "", 135 }, 136 ) 137 self.assertFalse(serializer.is_valid())
Test API validation (with OIDC endpoints)
def
test_source_redirect_login_hint_user(self):
139 def test_source_redirect_login_hint_user(self): 140 """test redirect view with login hint""" 141 user = User(email="foo@authentik.company") 142 session = self.client.session 143 plan = FlowPlan(generate_id()) 144 plan.context[PLAN_CONTEXT_PENDING_USER] = user 145 session[SESSION_KEY_PLAN] = plan 146 session.save() 147 148 res = self.client.get( 149 reverse( 150 "authentik_sources_oauth:oauth-client-login", 151 kwargs={"source_slug": self.source.slug}, 152 ) 153 ) 154 self.assertEqual(res.status_code, 302) 155 qs = parse_qs(res.url) 156 self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
test redirect view with login hint
def
test_source_redirect_login_hint_user_identifier(self):
158 def test_source_redirect_login_hint_user_identifier(self): 159 """test redirect view with login hint""" 160 session = self.client.session 161 plan = FlowPlan(generate_id()) 162 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company" 163 session[SESSION_KEY_PLAN] = plan 164 session.save() 165 166 res = self.client.get( 167 reverse( 168 "authentik_sources_oauth:oauth-client-login", 169 kwargs={"source_slug": self.source.slug}, 170 ) 171 ) 172 self.assertEqual(res.status_code, 302) 173 qs = parse_qs(res.url) 174 self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
test redirect view with login hint
def
test_source_redirect(self):
176 def test_source_redirect(self): 177 """test redirect view""" 178 res = self.client.get( 179 reverse( 180 "authentik_sources_oauth:oauth-client-login", 181 kwargs={"source_slug": self.source.slug}, 182 ) 183 ) 184 self.assertEqual(res.status_code, 302) 185 qs = parse_qs(res.url) 186 187 session = self.client.session 188 state = session[f"oauth-client-{self.source.name}-request-state"] 189 190 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 191 self.assertEqual(qs["response_type"], ["code"]) 192 self.assertEqual(qs["state"], [state]) 193 self.assertEqual(qs["scope"], ["email openid profile"])
test redirect view
def
test_source_redirect_pkce(self):
195 def test_source_redirect_pkce(self): 196 """test redirect view""" 197 self.source.pkce = PKCEMethod.S256 198 self.source.save() 199 res = self.client.get( 200 reverse( 201 "authentik_sources_oauth:oauth-client-login", 202 kwargs={"source_slug": self.source.slug}, 203 ) 204 ) 205 self.assertEqual(res.status_code, 302) 206 qs = parse_qs(res.url) 207 208 session = self.client.session 209 state = session[f"oauth-client-{self.source.name}-request-state"] 210 verifier = session[SESSION_KEY_OAUTH_PKCE] 211 self.assertEqual(len(verifier), 128) 212 challenge = pkce_s256_challenge(verifier) 213 214 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 215 self.assertEqual(qs["response_type"], ["code"]) 216 self.assertEqual(qs["state"], [state]) 217 self.assertEqual(qs["scope"], ["email openid profile"]) 218 self.assertEqual(qs["code_challenge"], [challenge]) 219 self.assertEqual(qs["code_challenge_method"], ["S256"])
test redirect view
def
test_source_callback(self):
221 def test_source_callback(self): 222 """test callback view""" 223 res = self.client.get( 224 reverse( 225 "authentik_sources_oauth:oauth-client-callback", 226 kwargs={"source_slug": self.source.slug}, 227 ) 228 ) 229 self.assertEqual(res.status_code, 302)
test callback view