authentik.sources.oauth.tests.test_views
OAuth Source tests
1"""OAuth Source tests""" 2 3from urllib.parse import parse_qs 4 5from django.urls import reverse 6from requests_mock import Mocker 7from rest_framework.test import APITestCase 8 9from authentik.core.models import User 10from authentik.core.tests.utils import create_test_admin_user 11from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan 12from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER 13from authentik.flows.views.executor import SESSION_KEY_PLAN 14from authentik.lib.generators import generate_id 15from authentik.providers.oauth2.utils import pkce_s256_challenge 16from authentik.sources.oauth.api.source import OAuthSourceSerializer 17from authentik.sources.oauth.clients.oauth2 import SESSION_KEY_OAUTH_PKCE 18from authentik.sources.oauth.models import OAuthSource, PKCEMethod 19 20 21class TestOAuthSource(APITestCase): 22 """OAuth Source tests""" 23 24 def setUp(self): 25 self.source = OAuthSource.objects.create( 26 name="test", 27 slug="test", 28 provider_type="openidconnect", 29 authorization_url="", 30 profile_url="", 31 consumer_key="", 32 ) 33 34 def test_api_read(self): 35 """Test reading a source""" 36 self.client.force_login(create_test_admin_user()) 37 response = self.client.get( 38 reverse( 39 "authentik_api:oauthsource-detail", 40 kwargs={ 41 "slug": self.source.slug, 42 }, 43 ) 44 ) 45 self.assertEqual(response.status_code, 200) 46 47 def test_api_validate(self): 48 """Test API validation""" 49 self.assertTrue( 50 OAuthSourceSerializer( 51 data={ 52 "name": "foo", 53 "slug": "bar", 54 "provider_type": "google", 55 "consumer_key": "foo", 56 "consumer_secret": "foo", 57 "oidc_well_known_url": "", 58 "oidc_jwks_url": "", 59 } 60 ).is_valid() 61 ) 62 self.assertFalse( 63 OAuthSourceSerializer( 64 data={ 65 "name": "foo", 66 "slug": "bar", 67 "provider_type": "openidconnect", 68 "consumer_key": "foo", 69 "consumer_secret": "foo", 70 } 71 ).is_valid() 72 ) 73 74 def test_api_validate_openid_connect(self): 75 """Test API validation (with OIDC endpoints)""" 76 openid_config = { 77 "issuer": "foo", 78 "authorization_endpoint": "http://mock/oauth/authorize", 79 "token_endpoint": "http://mock/oauth/token", 80 "userinfo_endpoint": "http://mock/oauth/userinfo", 81 "jwks_uri": "http://mock/oauth/discovery/keys", 82 } 83 jwks_config = {"keys": []} 84 with Mocker() as mocker: 85 url = "http://mock/.well-known/openid-configuration" 86 mocker.get(url, json=openid_config) 87 mocker.get(openid_config["jwks_uri"], json=jwks_config) 88 serializer = OAuthSourceSerializer( 89 instance=self.source, 90 data={ 91 "name": "foo", 92 "slug": "bar", 93 "provider_type": "openidconnect", 94 "consumer_key": "foo", 95 "consumer_secret": "foo", 96 "oidc_well_known_url": url, 97 "oidc_jwks_url": "", 98 }, 99 ) 100 self.assertTrue(serializer.is_valid()) 101 self.assertEqual( 102 serializer.validated_data["authorization_url"], "http://mock/oauth/authorize" 103 ) 104 self.assertEqual( 105 serializer.validated_data["access_token_url"], "http://mock/oauth/token" 106 ) 107 self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo") 108 self.assertEqual( 109 serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys" 110 ) 111 self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config) 112 113 def test_api_validate_openid_connect_invalid(self): 114 """Test API validation (with OIDC endpoints)""" 115 openid_config = {} 116 with Mocker() as mocker: 117 url = "http://mock/.well-known/openid-configuration" 118 mocker.get(url, json=openid_config) 119 serializer = OAuthSourceSerializer( 120 instance=self.source, 121 data={ 122 "name": "foo", 123 "slug": "bar", 124 "provider_type": "openidconnect", 125 "consumer_key": "foo", 126 "consumer_secret": "foo", 127 "authorization_url": "http://foo", 128 "access_token_url": "http://foo", 129 "profile_url": "http://foo", 130 "oidc_well_known_url": url, 131 "oidc_jwks_url": "", 132 }, 133 ) 134 self.assertFalse(serializer.is_valid()) 135 136 def test_source_redirect_login_hint_user(self): 137 """test redirect view with login hint""" 138 user = User(email="foo@authentik.company") 139 session = self.client.session 140 plan = FlowPlan(generate_id()) 141 plan.context[PLAN_CONTEXT_PENDING_USER] = user 142 session[SESSION_KEY_PLAN] = plan 143 session.save() 144 145 res = self.client.get( 146 reverse( 147 "authentik_sources_oauth:oauth-client-login", 148 kwargs={"source_slug": self.source.slug}, 149 ) 150 ) 151 self.assertEqual(res.status_code, 302) 152 qs = parse_qs(res.url) 153 self.assertEqual(qs["login_hint"], ["foo@authentik.company"]) 154 155 def test_source_redirect_login_hint_user_identifier(self): 156 """test redirect view with login hint""" 157 session = self.client.session 158 plan = FlowPlan(generate_id()) 159 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company" 160 session[SESSION_KEY_PLAN] = plan 161 session.save() 162 163 res = self.client.get( 164 reverse( 165 "authentik_sources_oauth:oauth-client-login", 166 kwargs={"source_slug": self.source.slug}, 167 ) 168 ) 169 self.assertEqual(res.status_code, 302) 170 qs = parse_qs(res.url) 171 self.assertEqual(qs["login_hint"], ["foo@authentik.company"]) 172 173 def test_source_redirect(self): 174 """test redirect view""" 175 res = self.client.get( 176 reverse( 177 "authentik_sources_oauth:oauth-client-login", 178 kwargs={"source_slug": self.source.slug}, 179 ) 180 ) 181 self.assertEqual(res.status_code, 302) 182 qs = parse_qs(res.url) 183 184 session = self.client.session 185 state = session[f"oauth-client-{self.source.name}-request-state"] 186 187 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 188 self.assertEqual(qs["response_type"], ["code"]) 189 self.assertEqual(qs["state"], [state]) 190 self.assertEqual(qs["scope"], ["email openid profile"]) 191 192 def test_source_redirect_pkce(self): 193 """test redirect view""" 194 self.source.pkce = PKCEMethod.S256 195 self.source.save() 196 res = self.client.get( 197 reverse( 198 "authentik_sources_oauth:oauth-client-login", 199 kwargs={"source_slug": self.source.slug}, 200 ) 201 ) 202 self.assertEqual(res.status_code, 302) 203 qs = parse_qs(res.url) 204 205 session = self.client.session 206 state = session[f"oauth-client-{self.source.name}-request-state"] 207 verifier = session[SESSION_KEY_OAUTH_PKCE] 208 self.assertEqual(len(verifier), 128) 209 challenge = pkce_s256_challenge(verifier) 210 211 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 212 self.assertEqual(qs["response_type"], ["code"]) 213 self.assertEqual(qs["state"], [state]) 214 self.assertEqual(qs["scope"], ["email openid profile"]) 215 self.assertEqual(qs["code_challenge"], [challenge]) 216 self.assertEqual(qs["code_challenge_method"], ["S256"]) 217 218 def test_source_callback(self): 219 """test callback view""" 220 res = self.client.get( 221 reverse( 222 "authentik_sources_oauth:oauth-client-callback", 223 kwargs={"source_slug": self.source.slug}, 224 ) 225 ) 226 self.assertEqual(res.status_code, 302)
class
TestOAuthSource(rest_framework.test.APITestCase):
22class TestOAuthSource(APITestCase): 23 """OAuth Source tests""" 24 25 def setUp(self): 26 self.source = OAuthSource.objects.create( 27 name="test", 28 slug="test", 29 provider_type="openidconnect", 30 authorization_url="", 31 profile_url="", 32 consumer_key="", 33 ) 34 35 def test_api_read(self): 36 """Test reading a source""" 37 self.client.force_login(create_test_admin_user()) 38 response = self.client.get( 39 reverse( 40 "authentik_api:oauthsource-detail", 41 kwargs={ 42 "slug": self.source.slug, 43 }, 44 ) 45 ) 46 self.assertEqual(response.status_code, 200) 47 48 def test_api_validate(self): 49 """Test API validation""" 50 self.assertTrue( 51 OAuthSourceSerializer( 52 data={ 53 "name": "foo", 54 "slug": "bar", 55 "provider_type": "google", 56 "consumer_key": "foo", 57 "consumer_secret": "foo", 58 "oidc_well_known_url": "", 59 "oidc_jwks_url": "", 60 } 61 ).is_valid() 62 ) 63 self.assertFalse( 64 OAuthSourceSerializer( 65 data={ 66 "name": "foo", 67 "slug": "bar", 68 "provider_type": "openidconnect", 69 "consumer_key": "foo", 70 "consumer_secret": "foo", 71 } 72 ).is_valid() 73 ) 74 75 def test_api_validate_openid_connect(self): 76 """Test API validation (with OIDC endpoints)""" 77 openid_config = { 78 "issuer": "foo", 79 "authorization_endpoint": "http://mock/oauth/authorize", 80 "token_endpoint": "http://mock/oauth/token", 81 "userinfo_endpoint": "http://mock/oauth/userinfo", 82 "jwks_uri": "http://mock/oauth/discovery/keys", 83 } 84 jwks_config = {"keys": []} 85 with Mocker() as mocker: 86 url = "http://mock/.well-known/openid-configuration" 87 mocker.get(url, json=openid_config) 88 mocker.get(openid_config["jwks_uri"], json=jwks_config) 89 serializer = OAuthSourceSerializer( 90 instance=self.source, 91 data={ 92 "name": "foo", 93 "slug": "bar", 94 "provider_type": "openidconnect", 95 "consumer_key": "foo", 96 "consumer_secret": "foo", 97 "oidc_well_known_url": url, 98 "oidc_jwks_url": "", 99 }, 100 ) 101 self.assertTrue(serializer.is_valid()) 102 self.assertEqual( 103 serializer.validated_data["authorization_url"], "http://mock/oauth/authorize" 104 ) 105 self.assertEqual( 106 serializer.validated_data["access_token_url"], "http://mock/oauth/token" 107 ) 108 self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo") 109 self.assertEqual( 110 serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys" 111 ) 112 self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config) 113 114 def test_api_validate_openid_connect_invalid(self): 115 """Test API validation (with OIDC endpoints)""" 116 openid_config = {} 117 with Mocker() as mocker: 118 url = "http://mock/.well-known/openid-configuration" 119 mocker.get(url, json=openid_config) 120 serializer = OAuthSourceSerializer( 121 instance=self.source, 122 data={ 123 "name": "foo", 124 "slug": "bar", 125 "provider_type": "openidconnect", 126 "consumer_key": "foo", 127 "consumer_secret": "foo", 128 "authorization_url": "http://foo", 129 "access_token_url": "http://foo", 130 "profile_url": "http://foo", 131 "oidc_well_known_url": url, 132 "oidc_jwks_url": "", 133 }, 134 ) 135 self.assertFalse(serializer.is_valid()) 136 137 def test_source_redirect_login_hint_user(self): 138 """test redirect view with login hint""" 139 user = User(email="foo@authentik.company") 140 session = self.client.session 141 plan = FlowPlan(generate_id()) 142 plan.context[PLAN_CONTEXT_PENDING_USER] = user 143 session[SESSION_KEY_PLAN] = plan 144 session.save() 145 146 res = self.client.get( 147 reverse( 148 "authentik_sources_oauth:oauth-client-login", 149 kwargs={"source_slug": self.source.slug}, 150 ) 151 ) 152 self.assertEqual(res.status_code, 302) 153 qs = parse_qs(res.url) 154 self.assertEqual(qs["login_hint"], ["foo@authentik.company"]) 155 156 def test_source_redirect_login_hint_user_identifier(self): 157 """test redirect view with login hint""" 158 session = self.client.session 159 plan = FlowPlan(generate_id()) 160 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company" 161 session[SESSION_KEY_PLAN] = plan 162 session.save() 163 164 res = self.client.get( 165 reverse( 166 "authentik_sources_oauth:oauth-client-login", 167 kwargs={"source_slug": self.source.slug}, 168 ) 169 ) 170 self.assertEqual(res.status_code, 302) 171 qs = parse_qs(res.url) 172 self.assertEqual(qs["login_hint"], ["foo@authentik.company"]) 173 174 def test_source_redirect(self): 175 """test redirect view""" 176 res = self.client.get( 177 reverse( 178 "authentik_sources_oauth:oauth-client-login", 179 kwargs={"source_slug": self.source.slug}, 180 ) 181 ) 182 self.assertEqual(res.status_code, 302) 183 qs = parse_qs(res.url) 184 185 session = self.client.session 186 state = session[f"oauth-client-{self.source.name}-request-state"] 187 188 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 189 self.assertEqual(qs["response_type"], ["code"]) 190 self.assertEqual(qs["state"], [state]) 191 self.assertEqual(qs["scope"], ["email openid profile"]) 192 193 def test_source_redirect_pkce(self): 194 """test redirect view""" 195 self.source.pkce = PKCEMethod.S256 196 self.source.save() 197 res = self.client.get( 198 reverse( 199 "authentik_sources_oauth:oauth-client-login", 200 kwargs={"source_slug": self.source.slug}, 201 ) 202 ) 203 self.assertEqual(res.status_code, 302) 204 qs = parse_qs(res.url) 205 206 session = self.client.session 207 state = session[f"oauth-client-{self.source.name}-request-state"] 208 verifier = session[SESSION_KEY_OAUTH_PKCE] 209 self.assertEqual(len(verifier), 128) 210 challenge = pkce_s256_challenge(verifier) 211 212 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 213 self.assertEqual(qs["response_type"], ["code"]) 214 self.assertEqual(qs["state"], [state]) 215 self.assertEqual(qs["scope"], ["email openid profile"]) 216 self.assertEqual(qs["code_challenge"], [challenge]) 217 self.assertEqual(qs["code_challenge_method"], ["S256"]) 218 219 def test_source_callback(self): 220 """test callback view""" 221 res = self.client.get( 222 reverse( 223 "authentik_sources_oauth:oauth-client-callback", 224 kwargs={"source_slug": self.source.slug}, 225 ) 226 ) 227 self.assertEqual(res.status_code, 302)
OAuth Source tests
def
setUp(self):
25 def setUp(self): 26 self.source = OAuthSource.objects.create( 27 name="test", 28 slug="test", 29 provider_type="openidconnect", 30 authorization_url="", 31 profile_url="", 32 consumer_key="", 33 )
Hook method for setting up the test fixture before exercising it.
def
test_api_read(self):
35 def test_api_read(self): 36 """Test reading a source""" 37 self.client.force_login(create_test_admin_user()) 38 response = self.client.get( 39 reverse( 40 "authentik_api:oauthsource-detail", 41 kwargs={ 42 "slug": self.source.slug, 43 }, 44 ) 45 ) 46 self.assertEqual(response.status_code, 200)
Test reading a source
def
test_api_validate(self):
48 def test_api_validate(self): 49 """Test API validation""" 50 self.assertTrue( 51 OAuthSourceSerializer( 52 data={ 53 "name": "foo", 54 "slug": "bar", 55 "provider_type": "google", 56 "consumer_key": "foo", 57 "consumer_secret": "foo", 58 "oidc_well_known_url": "", 59 "oidc_jwks_url": "", 60 } 61 ).is_valid() 62 ) 63 self.assertFalse( 64 OAuthSourceSerializer( 65 data={ 66 "name": "foo", 67 "slug": "bar", 68 "provider_type": "openidconnect", 69 "consumer_key": "foo", 70 "consumer_secret": "foo", 71 } 72 ).is_valid() 73 )
Test API validation
def
test_api_validate_openid_connect(self):
75 def test_api_validate_openid_connect(self): 76 """Test API validation (with OIDC endpoints)""" 77 openid_config = { 78 "issuer": "foo", 79 "authorization_endpoint": "http://mock/oauth/authorize", 80 "token_endpoint": "http://mock/oauth/token", 81 "userinfo_endpoint": "http://mock/oauth/userinfo", 82 "jwks_uri": "http://mock/oauth/discovery/keys", 83 } 84 jwks_config = {"keys": []} 85 with Mocker() as mocker: 86 url = "http://mock/.well-known/openid-configuration" 87 mocker.get(url, json=openid_config) 88 mocker.get(openid_config["jwks_uri"], json=jwks_config) 89 serializer = OAuthSourceSerializer( 90 instance=self.source, 91 data={ 92 "name": "foo", 93 "slug": "bar", 94 "provider_type": "openidconnect", 95 "consumer_key": "foo", 96 "consumer_secret": "foo", 97 "oidc_well_known_url": url, 98 "oidc_jwks_url": "", 99 }, 100 ) 101 self.assertTrue(serializer.is_valid()) 102 self.assertEqual( 103 serializer.validated_data["authorization_url"], "http://mock/oauth/authorize" 104 ) 105 self.assertEqual( 106 serializer.validated_data["access_token_url"], "http://mock/oauth/token" 107 ) 108 self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo") 109 self.assertEqual( 110 serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys" 111 ) 112 self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config)
Test API validation (with OIDC endpoints)
def
test_api_validate_openid_connect_invalid(self):
114 def test_api_validate_openid_connect_invalid(self): 115 """Test API validation (with OIDC endpoints)""" 116 openid_config = {} 117 with Mocker() as mocker: 118 url = "http://mock/.well-known/openid-configuration" 119 mocker.get(url, json=openid_config) 120 serializer = OAuthSourceSerializer( 121 instance=self.source, 122 data={ 123 "name": "foo", 124 "slug": "bar", 125 "provider_type": "openidconnect", 126 "consumer_key": "foo", 127 "consumer_secret": "foo", 128 "authorization_url": "http://foo", 129 "access_token_url": "http://foo", 130 "profile_url": "http://foo", 131 "oidc_well_known_url": url, 132 "oidc_jwks_url": "", 133 }, 134 ) 135 self.assertFalse(serializer.is_valid())
Test API validation (with OIDC endpoints)
def
test_source_redirect_login_hint_user(self):
137 def test_source_redirect_login_hint_user(self): 138 """test redirect view with login hint""" 139 user = User(email="foo@authentik.company") 140 session = self.client.session 141 plan = FlowPlan(generate_id()) 142 plan.context[PLAN_CONTEXT_PENDING_USER] = user 143 session[SESSION_KEY_PLAN] = plan 144 session.save() 145 146 res = self.client.get( 147 reverse( 148 "authentik_sources_oauth:oauth-client-login", 149 kwargs={"source_slug": self.source.slug}, 150 ) 151 ) 152 self.assertEqual(res.status_code, 302) 153 qs = parse_qs(res.url) 154 self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
test redirect view with login hint
def
test_source_redirect_login_hint_user_identifier(self):
156 def test_source_redirect_login_hint_user_identifier(self): 157 """test redirect view with login hint""" 158 session = self.client.session 159 plan = FlowPlan(generate_id()) 160 plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company" 161 session[SESSION_KEY_PLAN] = plan 162 session.save() 163 164 res = self.client.get( 165 reverse( 166 "authentik_sources_oauth:oauth-client-login", 167 kwargs={"source_slug": self.source.slug}, 168 ) 169 ) 170 self.assertEqual(res.status_code, 302) 171 qs = parse_qs(res.url) 172 self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
test redirect view with login hint
def
test_source_redirect(self):
174 def test_source_redirect(self): 175 """test redirect view""" 176 res = self.client.get( 177 reverse( 178 "authentik_sources_oauth:oauth-client-login", 179 kwargs={"source_slug": self.source.slug}, 180 ) 181 ) 182 self.assertEqual(res.status_code, 302) 183 qs = parse_qs(res.url) 184 185 session = self.client.session 186 state = session[f"oauth-client-{self.source.name}-request-state"] 187 188 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 189 self.assertEqual(qs["response_type"], ["code"]) 190 self.assertEqual(qs["state"], [state]) 191 self.assertEqual(qs["scope"], ["email openid profile"])
test redirect view
def
test_source_redirect_pkce(self):
193 def test_source_redirect_pkce(self): 194 """test redirect view""" 195 self.source.pkce = PKCEMethod.S256 196 self.source.save() 197 res = self.client.get( 198 reverse( 199 "authentik_sources_oauth:oauth-client-login", 200 kwargs={"source_slug": self.source.slug}, 201 ) 202 ) 203 self.assertEqual(res.status_code, 302) 204 qs = parse_qs(res.url) 205 206 session = self.client.session 207 state = session[f"oauth-client-{self.source.name}-request-state"] 208 verifier = session[SESSION_KEY_OAUTH_PKCE] 209 self.assertEqual(len(verifier), 128) 210 challenge = pkce_s256_challenge(verifier) 211 212 self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"]) 213 self.assertEqual(qs["response_type"], ["code"]) 214 self.assertEqual(qs["state"], [state]) 215 self.assertEqual(qs["scope"], ["email openid profile"]) 216 self.assertEqual(qs["code_challenge"], [challenge]) 217 self.assertEqual(qs["code_challenge_method"], ["S256"])
test redirect view
def
test_source_callback(self):
219 def test_source_callback(self): 220 """test callback view""" 221 res = self.client.get( 222 reverse( 223 "authentik_sources_oauth:oauth-client-callback", 224 kwargs={"source_slug": self.source.slug}, 225 ) 226 ) 227 self.assertEqual(res.status_code, 302)
test callback view