authentik.sources.oauth.tests.test_views

OAuth Source tests

  1"""OAuth Source tests"""
  2
  3from urllib.parse import parse_qs
  4
  5from django.urls import reverse
  6from requests_mock import Mocker
  7from rest_framework.test import APITestCase
  8
  9from authentik.core.models import User
 10from authentik.core.tests.utils import create_test_admin_user
 11from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
 12from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER
 13from authentik.flows.views.executor import SESSION_KEY_PLAN
 14from authentik.lib.generators import generate_id
 15from authentik.providers.oauth2.utils import pkce_s256_challenge
 16from authentik.sources.oauth.api.source import OAuthSourceSerializer
 17from authentik.sources.oauth.clients.oauth2 import SESSION_KEY_OAUTH_PKCE
 18from authentik.sources.oauth.models import OAuthSource, PKCEMethod
 19
 20
 21class TestOAuthSource(APITestCase):
 22    """OAuth Source tests"""
 23
 24    def setUp(self):
 25        self.source = OAuthSource.objects.create(
 26            name="test",
 27            slug="test",
 28            provider_type="openidconnect",
 29            authorization_url="",
 30            profile_url="",
 31            consumer_key="",
 32        )
 33
 34    def test_api_read(self):
 35        """Test reading a source"""
 36        self.client.force_login(create_test_admin_user())
 37        response = self.client.get(
 38            reverse(
 39                "authentik_api:oauthsource-detail",
 40                kwargs={
 41                    "slug": self.source.slug,
 42                },
 43            )
 44        )
 45        self.assertEqual(response.status_code, 200)
 46
 47    def test_api_validate(self):
 48        """Test API validation"""
 49        self.assertTrue(
 50            OAuthSourceSerializer(
 51                data={
 52                    "name": "foo",
 53                    "slug": "bar",
 54                    "provider_type": "google",
 55                    "consumer_key": "foo",
 56                    "consumer_secret": "foo",
 57                    "oidc_well_known_url": "",
 58                    "oidc_jwks_url": "",
 59                }
 60            ).is_valid()
 61        )
 62        self.assertFalse(
 63            OAuthSourceSerializer(
 64                data={
 65                    "name": "foo",
 66                    "slug": "bar",
 67                    "provider_type": "openidconnect",
 68                    "consumer_key": "foo",
 69                    "consumer_secret": "foo",
 70                }
 71            ).is_valid()
 72        )
 73
 74    def test_api_validate_openid_connect(self):
 75        """Test API validation (with OIDC endpoints)"""
 76        openid_config = {
 77            "issuer": "foo",
 78            "authorization_endpoint": "http://mock/oauth/authorize",
 79            "token_endpoint": "http://mock/oauth/token",
 80            "userinfo_endpoint": "http://mock/oauth/userinfo",
 81            "jwks_uri": "http://mock/oauth/discovery/keys",
 82            "code_challenge_methods_supported": ["S256"],
 83        }
 84        jwks_config = {"keys": []}
 85        with Mocker() as mocker:
 86            url = "http://mock/.well-known/openid-configuration"
 87            mocker.get(url, json=openid_config)
 88            mocker.get(openid_config["jwks_uri"], json=jwks_config)
 89            serializer = OAuthSourceSerializer(
 90                instance=self.source,
 91                data={
 92                    "name": "foo",
 93                    "slug": "bar",
 94                    "provider_type": "openidconnect",
 95                    "consumer_key": "foo",
 96                    "consumer_secret": "foo",
 97                    "oidc_well_known_url": url,
 98                    "oidc_jwks_url": "",
 99                },
100            )
101            self.assertTrue(serializer.is_valid())
102            self.assertEqual(
103                serializer.validated_data["authorization_url"], "http://mock/oauth/authorize"
104            )
105            self.assertEqual(
106                serializer.validated_data["access_token_url"], "http://mock/oauth/token"
107            )
108            self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo")
109            self.assertEqual(
110                serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys"
111            )
112            self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config)
113            self.assertEqual(serializer.validated_data["pkce"], PKCEMethod.S256)
114
115    def test_api_validate_openid_connect_invalid(self):
116        """Test API validation (with OIDC endpoints)"""
117        openid_config = {}
118        with Mocker() as mocker:
119            url = "http://mock/.well-known/openid-configuration"
120            mocker.get(url, json=openid_config)
121            serializer = OAuthSourceSerializer(
122                instance=self.source,
123                data={
124                    "name": "foo",
125                    "slug": "bar",
126                    "provider_type": "openidconnect",
127                    "consumer_key": "foo",
128                    "consumer_secret": "foo",
129                    "authorization_url": "http://foo",
130                    "access_token_url": "http://foo",
131                    "profile_url": "http://foo",
132                    "oidc_well_known_url": url,
133                    "oidc_jwks_url": "",
134                },
135            )
136            self.assertFalse(serializer.is_valid())
137
138    def test_source_redirect_login_hint_user(self):
139        """test redirect view with login hint"""
140        user = User(email="foo@authentik.company")
141        session = self.client.session
142        plan = FlowPlan(generate_id())
143        plan.context[PLAN_CONTEXT_PENDING_USER] = user
144        session[SESSION_KEY_PLAN] = plan
145        session.save()
146
147        res = self.client.get(
148            reverse(
149                "authentik_sources_oauth:oauth-client-login",
150                kwargs={"source_slug": self.source.slug},
151            )
152        )
153        self.assertEqual(res.status_code, 302)
154        qs = parse_qs(res.url)
155        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
156
157    def test_source_redirect_login_hint_user_identifier(self):
158        """test redirect view with login hint"""
159        session = self.client.session
160        plan = FlowPlan(generate_id())
161        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company"
162        session[SESSION_KEY_PLAN] = plan
163        session.save()
164
165        res = self.client.get(
166            reverse(
167                "authentik_sources_oauth:oauth-client-login",
168                kwargs={"source_slug": self.source.slug},
169            )
170        )
171        self.assertEqual(res.status_code, 302)
172        qs = parse_qs(res.url)
173        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
174
175    def test_source_redirect(self):
176        """test redirect view"""
177        res = self.client.get(
178            reverse(
179                "authentik_sources_oauth:oauth-client-login",
180                kwargs={"source_slug": self.source.slug},
181            )
182        )
183        self.assertEqual(res.status_code, 302)
184        qs = parse_qs(res.url)
185
186        session = self.client.session
187        state = session[f"oauth-client-{self.source.name}-request-state"]
188
189        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
190        self.assertEqual(qs["response_type"], ["code"])
191        self.assertEqual(qs["state"], [state])
192        self.assertEqual(qs["scope"], ["email openid profile"])
193
194    def test_source_redirect_pkce(self):
195        """test redirect view"""
196        self.source.pkce = PKCEMethod.S256
197        self.source.save()
198        res = self.client.get(
199            reverse(
200                "authentik_sources_oauth:oauth-client-login",
201                kwargs={"source_slug": self.source.slug},
202            )
203        )
204        self.assertEqual(res.status_code, 302)
205        qs = parse_qs(res.url)
206
207        session = self.client.session
208        state = session[f"oauth-client-{self.source.name}-request-state"]
209        verifier = session[SESSION_KEY_OAUTH_PKCE]
210        self.assertEqual(len(verifier), 128)
211        challenge = pkce_s256_challenge(verifier)
212
213        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
214        self.assertEqual(qs["response_type"], ["code"])
215        self.assertEqual(qs["state"], [state])
216        self.assertEqual(qs["scope"], ["email openid profile"])
217        self.assertEqual(qs["code_challenge"], [challenge])
218        self.assertEqual(qs["code_challenge_method"], ["S256"])
219
220    def test_source_callback(self):
221        """test callback view"""
222        res = self.client.get(
223            reverse(
224                "authentik_sources_oauth:oauth-client-callback",
225                kwargs={"source_slug": self.source.slug},
226            )
227        )
228        self.assertEqual(res.status_code, 302)
class TestOAuthSource(rest_framework.test.APITestCase):
 22class TestOAuthSource(APITestCase):
 23    """OAuth Source tests"""
 24
 25    def setUp(self):
 26        self.source = OAuthSource.objects.create(
 27            name="test",
 28            slug="test",
 29            provider_type="openidconnect",
 30            authorization_url="",
 31            profile_url="",
 32            consumer_key="",
 33        )
 34
 35    def test_api_read(self):
 36        """Test reading a source"""
 37        self.client.force_login(create_test_admin_user())
 38        response = self.client.get(
 39            reverse(
 40                "authentik_api:oauthsource-detail",
 41                kwargs={
 42                    "slug": self.source.slug,
 43                },
 44            )
 45        )
 46        self.assertEqual(response.status_code, 200)
 47
 48    def test_api_validate(self):
 49        """Test API validation"""
 50        self.assertTrue(
 51            OAuthSourceSerializer(
 52                data={
 53                    "name": "foo",
 54                    "slug": "bar",
 55                    "provider_type": "google",
 56                    "consumer_key": "foo",
 57                    "consumer_secret": "foo",
 58                    "oidc_well_known_url": "",
 59                    "oidc_jwks_url": "",
 60                }
 61            ).is_valid()
 62        )
 63        self.assertFalse(
 64            OAuthSourceSerializer(
 65                data={
 66                    "name": "foo",
 67                    "slug": "bar",
 68                    "provider_type": "openidconnect",
 69                    "consumer_key": "foo",
 70                    "consumer_secret": "foo",
 71                }
 72            ).is_valid()
 73        )
 74
 75    def test_api_validate_openid_connect(self):
 76        """Test API validation (with OIDC endpoints)"""
 77        openid_config = {
 78            "issuer": "foo",
 79            "authorization_endpoint": "http://mock/oauth/authorize",
 80            "token_endpoint": "http://mock/oauth/token",
 81            "userinfo_endpoint": "http://mock/oauth/userinfo",
 82            "jwks_uri": "http://mock/oauth/discovery/keys",
 83            "code_challenge_methods_supported": ["S256"],
 84        }
 85        jwks_config = {"keys": []}
 86        with Mocker() as mocker:
 87            url = "http://mock/.well-known/openid-configuration"
 88            mocker.get(url, json=openid_config)
 89            mocker.get(openid_config["jwks_uri"], json=jwks_config)
 90            serializer = OAuthSourceSerializer(
 91                instance=self.source,
 92                data={
 93                    "name": "foo",
 94                    "slug": "bar",
 95                    "provider_type": "openidconnect",
 96                    "consumer_key": "foo",
 97                    "consumer_secret": "foo",
 98                    "oidc_well_known_url": url,
 99                    "oidc_jwks_url": "",
100                },
101            )
102            self.assertTrue(serializer.is_valid())
103            self.assertEqual(
104                serializer.validated_data["authorization_url"], "http://mock/oauth/authorize"
105            )
106            self.assertEqual(
107                serializer.validated_data["access_token_url"], "http://mock/oauth/token"
108            )
109            self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo")
110            self.assertEqual(
111                serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys"
112            )
113            self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config)
114            self.assertEqual(serializer.validated_data["pkce"], PKCEMethod.S256)
115
116    def test_api_validate_openid_connect_invalid(self):
117        """Test API validation (with OIDC endpoints)"""
118        openid_config = {}
119        with Mocker() as mocker:
120            url = "http://mock/.well-known/openid-configuration"
121            mocker.get(url, json=openid_config)
122            serializer = OAuthSourceSerializer(
123                instance=self.source,
124                data={
125                    "name": "foo",
126                    "slug": "bar",
127                    "provider_type": "openidconnect",
128                    "consumer_key": "foo",
129                    "consumer_secret": "foo",
130                    "authorization_url": "http://foo",
131                    "access_token_url": "http://foo",
132                    "profile_url": "http://foo",
133                    "oidc_well_known_url": url,
134                    "oidc_jwks_url": "",
135                },
136            )
137            self.assertFalse(serializer.is_valid())
138
139    def test_source_redirect_login_hint_user(self):
140        """test redirect view with login hint"""
141        user = User(email="foo@authentik.company")
142        session = self.client.session
143        plan = FlowPlan(generate_id())
144        plan.context[PLAN_CONTEXT_PENDING_USER] = user
145        session[SESSION_KEY_PLAN] = plan
146        session.save()
147
148        res = self.client.get(
149            reverse(
150                "authentik_sources_oauth:oauth-client-login",
151                kwargs={"source_slug": self.source.slug},
152            )
153        )
154        self.assertEqual(res.status_code, 302)
155        qs = parse_qs(res.url)
156        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
157
158    def test_source_redirect_login_hint_user_identifier(self):
159        """test redirect view with login hint"""
160        session = self.client.session
161        plan = FlowPlan(generate_id())
162        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company"
163        session[SESSION_KEY_PLAN] = plan
164        session.save()
165
166        res = self.client.get(
167            reverse(
168                "authentik_sources_oauth:oauth-client-login",
169                kwargs={"source_slug": self.source.slug},
170            )
171        )
172        self.assertEqual(res.status_code, 302)
173        qs = parse_qs(res.url)
174        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
175
176    def test_source_redirect(self):
177        """test redirect view"""
178        res = self.client.get(
179            reverse(
180                "authentik_sources_oauth:oauth-client-login",
181                kwargs={"source_slug": self.source.slug},
182            )
183        )
184        self.assertEqual(res.status_code, 302)
185        qs = parse_qs(res.url)
186
187        session = self.client.session
188        state = session[f"oauth-client-{self.source.name}-request-state"]
189
190        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
191        self.assertEqual(qs["response_type"], ["code"])
192        self.assertEqual(qs["state"], [state])
193        self.assertEqual(qs["scope"], ["email openid profile"])
194
195    def test_source_redirect_pkce(self):
196        """test redirect view"""
197        self.source.pkce = PKCEMethod.S256
198        self.source.save()
199        res = self.client.get(
200            reverse(
201                "authentik_sources_oauth:oauth-client-login",
202                kwargs={"source_slug": self.source.slug},
203            )
204        )
205        self.assertEqual(res.status_code, 302)
206        qs = parse_qs(res.url)
207
208        session = self.client.session
209        state = session[f"oauth-client-{self.source.name}-request-state"]
210        verifier = session[SESSION_KEY_OAUTH_PKCE]
211        self.assertEqual(len(verifier), 128)
212        challenge = pkce_s256_challenge(verifier)
213
214        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
215        self.assertEqual(qs["response_type"], ["code"])
216        self.assertEqual(qs["state"], [state])
217        self.assertEqual(qs["scope"], ["email openid profile"])
218        self.assertEqual(qs["code_challenge"], [challenge])
219        self.assertEqual(qs["code_challenge_method"], ["S256"])
220
221    def test_source_callback(self):
222        """test callback view"""
223        res = self.client.get(
224            reverse(
225                "authentik_sources_oauth:oauth-client-callback",
226                kwargs={"source_slug": self.source.slug},
227            )
228        )
229        self.assertEqual(res.status_code, 302)

OAuth Source tests

def setUp(self):
25    def setUp(self):
26        self.source = OAuthSource.objects.create(
27            name="test",
28            slug="test",
29            provider_type="openidconnect",
30            authorization_url="",
31            profile_url="",
32            consumer_key="",
33        )

Hook method for setting up the test fixture before exercising it.

def test_api_read(self):
35    def test_api_read(self):
36        """Test reading a source"""
37        self.client.force_login(create_test_admin_user())
38        response = self.client.get(
39            reverse(
40                "authentik_api:oauthsource-detail",
41                kwargs={
42                    "slug": self.source.slug,
43                },
44            )
45        )
46        self.assertEqual(response.status_code, 200)

Test reading a source

def test_api_validate(self):
48    def test_api_validate(self):
49        """Test API validation"""
50        self.assertTrue(
51            OAuthSourceSerializer(
52                data={
53                    "name": "foo",
54                    "slug": "bar",
55                    "provider_type": "google",
56                    "consumer_key": "foo",
57                    "consumer_secret": "foo",
58                    "oidc_well_known_url": "",
59                    "oidc_jwks_url": "",
60                }
61            ).is_valid()
62        )
63        self.assertFalse(
64            OAuthSourceSerializer(
65                data={
66                    "name": "foo",
67                    "slug": "bar",
68                    "provider_type": "openidconnect",
69                    "consumer_key": "foo",
70                    "consumer_secret": "foo",
71                }
72            ).is_valid()
73        )

Test API validation

def test_api_validate_openid_connect(self):
 75    def test_api_validate_openid_connect(self):
 76        """Test API validation (with OIDC endpoints)"""
 77        openid_config = {
 78            "issuer": "foo",
 79            "authorization_endpoint": "http://mock/oauth/authorize",
 80            "token_endpoint": "http://mock/oauth/token",
 81            "userinfo_endpoint": "http://mock/oauth/userinfo",
 82            "jwks_uri": "http://mock/oauth/discovery/keys",
 83            "code_challenge_methods_supported": ["S256"],
 84        }
 85        jwks_config = {"keys": []}
 86        with Mocker() as mocker:
 87            url = "http://mock/.well-known/openid-configuration"
 88            mocker.get(url, json=openid_config)
 89            mocker.get(openid_config["jwks_uri"], json=jwks_config)
 90            serializer = OAuthSourceSerializer(
 91                instance=self.source,
 92                data={
 93                    "name": "foo",
 94                    "slug": "bar",
 95                    "provider_type": "openidconnect",
 96                    "consumer_key": "foo",
 97                    "consumer_secret": "foo",
 98                    "oidc_well_known_url": url,
 99                    "oidc_jwks_url": "",
100                },
101            )
102            self.assertTrue(serializer.is_valid())
103            self.assertEqual(
104                serializer.validated_data["authorization_url"], "http://mock/oauth/authorize"
105            )
106            self.assertEqual(
107                serializer.validated_data["access_token_url"], "http://mock/oauth/token"
108            )
109            self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo")
110            self.assertEqual(
111                serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys"
112            )
113            self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config)
114            self.assertEqual(serializer.validated_data["pkce"], PKCEMethod.S256)

Test API validation (with OIDC endpoints)

def test_api_validate_openid_connect_invalid(self):
116    def test_api_validate_openid_connect_invalid(self):
117        """Test API validation (with OIDC endpoints)"""
118        openid_config = {}
119        with Mocker() as mocker:
120            url = "http://mock/.well-known/openid-configuration"
121            mocker.get(url, json=openid_config)
122            serializer = OAuthSourceSerializer(
123                instance=self.source,
124                data={
125                    "name": "foo",
126                    "slug": "bar",
127                    "provider_type": "openidconnect",
128                    "consumer_key": "foo",
129                    "consumer_secret": "foo",
130                    "authorization_url": "http://foo",
131                    "access_token_url": "http://foo",
132                    "profile_url": "http://foo",
133                    "oidc_well_known_url": url,
134                    "oidc_jwks_url": "",
135                },
136            )
137            self.assertFalse(serializer.is_valid())

Test API validation (with OIDC endpoints)

def test_source_redirect_login_hint_user(self):
139    def test_source_redirect_login_hint_user(self):
140        """test redirect view with login hint"""
141        user = User(email="foo@authentik.company")
142        session = self.client.session
143        plan = FlowPlan(generate_id())
144        plan.context[PLAN_CONTEXT_PENDING_USER] = user
145        session[SESSION_KEY_PLAN] = plan
146        session.save()
147
148        res = self.client.get(
149            reverse(
150                "authentik_sources_oauth:oauth-client-login",
151                kwargs={"source_slug": self.source.slug},
152            )
153        )
154        self.assertEqual(res.status_code, 302)
155        qs = parse_qs(res.url)
156        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])

test redirect view with login hint

def test_source_redirect_login_hint_user_identifier(self):
158    def test_source_redirect_login_hint_user_identifier(self):
159        """test redirect view with login hint"""
160        session = self.client.session
161        plan = FlowPlan(generate_id())
162        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company"
163        session[SESSION_KEY_PLAN] = plan
164        session.save()
165
166        res = self.client.get(
167            reverse(
168                "authentik_sources_oauth:oauth-client-login",
169                kwargs={"source_slug": self.source.slug},
170            )
171        )
172        self.assertEqual(res.status_code, 302)
173        qs = parse_qs(res.url)
174        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])

test redirect view with login hint

def test_source_redirect(self):
176    def test_source_redirect(self):
177        """test redirect view"""
178        res = self.client.get(
179            reverse(
180                "authentik_sources_oauth:oauth-client-login",
181                kwargs={"source_slug": self.source.slug},
182            )
183        )
184        self.assertEqual(res.status_code, 302)
185        qs = parse_qs(res.url)
186
187        session = self.client.session
188        state = session[f"oauth-client-{self.source.name}-request-state"]
189
190        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
191        self.assertEqual(qs["response_type"], ["code"])
192        self.assertEqual(qs["state"], [state])
193        self.assertEqual(qs["scope"], ["email openid profile"])

test redirect view

def test_source_redirect_pkce(self):
195    def test_source_redirect_pkce(self):
196        """test redirect view"""
197        self.source.pkce = PKCEMethod.S256
198        self.source.save()
199        res = self.client.get(
200            reverse(
201                "authentik_sources_oauth:oauth-client-login",
202                kwargs={"source_slug": self.source.slug},
203            )
204        )
205        self.assertEqual(res.status_code, 302)
206        qs = parse_qs(res.url)
207
208        session = self.client.session
209        state = session[f"oauth-client-{self.source.name}-request-state"]
210        verifier = session[SESSION_KEY_OAUTH_PKCE]
211        self.assertEqual(len(verifier), 128)
212        challenge = pkce_s256_challenge(verifier)
213
214        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
215        self.assertEqual(qs["response_type"], ["code"])
216        self.assertEqual(qs["state"], [state])
217        self.assertEqual(qs["scope"], ["email openid profile"])
218        self.assertEqual(qs["code_challenge"], [challenge])
219        self.assertEqual(qs["code_challenge_method"], ["S256"])

test redirect view

def test_source_callback(self):
221    def test_source_callback(self):
222        """test callback view"""
223        res = self.client.get(
224            reverse(
225                "authentik_sources_oauth:oauth-client-callback",
226                kwargs={"source_slug": self.source.slug},
227            )
228        )
229        self.assertEqual(res.status_code, 302)

test callback view