authentik.sources.oauth.tests.test_views

OAuth Source tests

  1"""OAuth Source tests"""
  2
  3from urllib.parse import parse_qs
  4
  5from django.urls import reverse
  6from requests_mock import Mocker
  7from rest_framework.test import APITestCase
  8
  9from authentik.core.models import User
 10from authentik.core.tests.utils import create_test_admin_user
 11from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
 12from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER
 13from authentik.flows.views.executor import SESSION_KEY_PLAN
 14from authentik.lib.generators import generate_id
 15from authentik.providers.oauth2.utils import pkce_s256_challenge
 16from authentik.sources.oauth.api.source import OAuthSourceSerializer
 17from authentik.sources.oauth.clients.oauth2 import SESSION_KEY_OAUTH_PKCE
 18from authentik.sources.oauth.models import OAuthSource, PKCEMethod
 19
 20
 21class TestOAuthSource(APITestCase):
 22    """OAuth Source tests"""
 23
 24    def setUp(self):
 25        self.source = OAuthSource.objects.create(
 26            name="test",
 27            slug="test",
 28            provider_type="openidconnect",
 29            authorization_url="",
 30            profile_url="",
 31            consumer_key="",
 32        )
 33
 34    def test_api_read(self):
 35        """Test reading a source"""
 36        self.client.force_login(create_test_admin_user())
 37        response = self.client.get(
 38            reverse(
 39                "authentik_api:oauthsource-detail",
 40                kwargs={
 41                    "slug": self.source.slug,
 42                },
 43            )
 44        )
 45        self.assertEqual(response.status_code, 200)
 46
 47    def test_api_validate(self):
 48        """Test API validation"""
 49        self.assertTrue(
 50            OAuthSourceSerializer(
 51                data={
 52                    "name": "foo",
 53                    "slug": "bar",
 54                    "provider_type": "google",
 55                    "consumer_key": "foo",
 56                    "consumer_secret": "foo",
 57                    "oidc_well_known_url": "",
 58                    "oidc_jwks_url": "",
 59                }
 60            ).is_valid()
 61        )
 62        self.assertFalse(
 63            OAuthSourceSerializer(
 64                data={
 65                    "name": "foo",
 66                    "slug": "bar",
 67                    "provider_type": "openidconnect",
 68                    "consumer_key": "foo",
 69                    "consumer_secret": "foo",
 70                }
 71            ).is_valid()
 72        )
 73
 74    def test_api_validate_openid_connect(self):
 75        """Test API validation (with OIDC endpoints)"""
 76        openid_config = {
 77            "issuer": "foo",
 78            "authorization_endpoint": "http://mock/oauth/authorize",
 79            "token_endpoint": "http://mock/oauth/token",
 80            "userinfo_endpoint": "http://mock/oauth/userinfo",
 81            "jwks_uri": "http://mock/oauth/discovery/keys",
 82        }
 83        jwks_config = {"keys": []}
 84        with Mocker() as mocker:
 85            url = "http://mock/.well-known/openid-configuration"
 86            mocker.get(url, json=openid_config)
 87            mocker.get(openid_config["jwks_uri"], json=jwks_config)
 88            serializer = OAuthSourceSerializer(
 89                instance=self.source,
 90                data={
 91                    "name": "foo",
 92                    "slug": "bar",
 93                    "provider_type": "openidconnect",
 94                    "consumer_key": "foo",
 95                    "consumer_secret": "foo",
 96                    "oidc_well_known_url": url,
 97                    "oidc_jwks_url": "",
 98                },
 99            )
100            self.assertTrue(serializer.is_valid())
101            self.assertEqual(
102                serializer.validated_data["authorization_url"], "http://mock/oauth/authorize"
103            )
104            self.assertEqual(
105                serializer.validated_data["access_token_url"], "http://mock/oauth/token"
106            )
107            self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo")
108            self.assertEqual(
109                serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys"
110            )
111            self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config)
112
113    def test_api_validate_openid_connect_invalid(self):
114        """Test API validation (with OIDC endpoints)"""
115        openid_config = {}
116        with Mocker() as mocker:
117            url = "http://mock/.well-known/openid-configuration"
118            mocker.get(url, json=openid_config)
119            serializer = OAuthSourceSerializer(
120                instance=self.source,
121                data={
122                    "name": "foo",
123                    "slug": "bar",
124                    "provider_type": "openidconnect",
125                    "consumer_key": "foo",
126                    "consumer_secret": "foo",
127                    "authorization_url": "http://foo",
128                    "access_token_url": "http://foo",
129                    "profile_url": "http://foo",
130                    "oidc_well_known_url": url,
131                    "oidc_jwks_url": "",
132                },
133            )
134            self.assertFalse(serializer.is_valid())
135
136    def test_source_redirect_login_hint_user(self):
137        """test redirect view with login hint"""
138        user = User(email="foo@authentik.company")
139        session = self.client.session
140        plan = FlowPlan(generate_id())
141        plan.context[PLAN_CONTEXT_PENDING_USER] = user
142        session[SESSION_KEY_PLAN] = plan
143        session.save()
144
145        res = self.client.get(
146            reverse(
147                "authentik_sources_oauth:oauth-client-login",
148                kwargs={"source_slug": self.source.slug},
149            )
150        )
151        self.assertEqual(res.status_code, 302)
152        qs = parse_qs(res.url)
153        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
154
155    def test_source_redirect_login_hint_user_identifier(self):
156        """test redirect view with login hint"""
157        session = self.client.session
158        plan = FlowPlan(generate_id())
159        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company"
160        session[SESSION_KEY_PLAN] = plan
161        session.save()
162
163        res = self.client.get(
164            reverse(
165                "authentik_sources_oauth:oauth-client-login",
166                kwargs={"source_slug": self.source.slug},
167            )
168        )
169        self.assertEqual(res.status_code, 302)
170        qs = parse_qs(res.url)
171        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
172
173    def test_source_redirect(self):
174        """test redirect view"""
175        res = self.client.get(
176            reverse(
177                "authentik_sources_oauth:oauth-client-login",
178                kwargs={"source_slug": self.source.slug},
179            )
180        )
181        self.assertEqual(res.status_code, 302)
182        qs = parse_qs(res.url)
183
184        session = self.client.session
185        state = session[f"oauth-client-{self.source.name}-request-state"]
186
187        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
188        self.assertEqual(qs["response_type"], ["code"])
189        self.assertEqual(qs["state"], [state])
190        self.assertEqual(qs["scope"], ["email openid profile"])
191
192    def test_source_redirect_pkce(self):
193        """test redirect view"""
194        self.source.pkce = PKCEMethod.S256
195        self.source.save()
196        res = self.client.get(
197            reverse(
198                "authentik_sources_oauth:oauth-client-login",
199                kwargs={"source_slug": self.source.slug},
200            )
201        )
202        self.assertEqual(res.status_code, 302)
203        qs = parse_qs(res.url)
204
205        session = self.client.session
206        state = session[f"oauth-client-{self.source.name}-request-state"]
207        verifier = session[SESSION_KEY_OAUTH_PKCE]
208        self.assertEqual(len(verifier), 128)
209        challenge = pkce_s256_challenge(verifier)
210
211        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
212        self.assertEqual(qs["response_type"], ["code"])
213        self.assertEqual(qs["state"], [state])
214        self.assertEqual(qs["scope"], ["email openid profile"])
215        self.assertEqual(qs["code_challenge"], [challenge])
216        self.assertEqual(qs["code_challenge_method"], ["S256"])
217
218    def test_source_callback(self):
219        """test callback view"""
220        res = self.client.get(
221            reverse(
222                "authentik_sources_oauth:oauth-client-callback",
223                kwargs={"source_slug": self.source.slug},
224            )
225        )
226        self.assertEqual(res.status_code, 302)
class TestOAuthSource(rest_framework.test.APITestCase):
 22class TestOAuthSource(APITestCase):
 23    """OAuth Source tests"""
 24
 25    def setUp(self):
 26        self.source = OAuthSource.objects.create(
 27            name="test",
 28            slug="test",
 29            provider_type="openidconnect",
 30            authorization_url="",
 31            profile_url="",
 32            consumer_key="",
 33        )
 34
 35    def test_api_read(self):
 36        """Test reading a source"""
 37        self.client.force_login(create_test_admin_user())
 38        response = self.client.get(
 39            reverse(
 40                "authentik_api:oauthsource-detail",
 41                kwargs={
 42                    "slug": self.source.slug,
 43                },
 44            )
 45        )
 46        self.assertEqual(response.status_code, 200)
 47
 48    def test_api_validate(self):
 49        """Test API validation"""
 50        self.assertTrue(
 51            OAuthSourceSerializer(
 52                data={
 53                    "name": "foo",
 54                    "slug": "bar",
 55                    "provider_type": "google",
 56                    "consumer_key": "foo",
 57                    "consumer_secret": "foo",
 58                    "oidc_well_known_url": "",
 59                    "oidc_jwks_url": "",
 60                }
 61            ).is_valid()
 62        )
 63        self.assertFalse(
 64            OAuthSourceSerializer(
 65                data={
 66                    "name": "foo",
 67                    "slug": "bar",
 68                    "provider_type": "openidconnect",
 69                    "consumer_key": "foo",
 70                    "consumer_secret": "foo",
 71                }
 72            ).is_valid()
 73        )
 74
 75    def test_api_validate_openid_connect(self):
 76        """Test API validation (with OIDC endpoints)"""
 77        openid_config = {
 78            "issuer": "foo",
 79            "authorization_endpoint": "http://mock/oauth/authorize",
 80            "token_endpoint": "http://mock/oauth/token",
 81            "userinfo_endpoint": "http://mock/oauth/userinfo",
 82            "jwks_uri": "http://mock/oauth/discovery/keys",
 83        }
 84        jwks_config = {"keys": []}
 85        with Mocker() as mocker:
 86            url = "http://mock/.well-known/openid-configuration"
 87            mocker.get(url, json=openid_config)
 88            mocker.get(openid_config["jwks_uri"], json=jwks_config)
 89            serializer = OAuthSourceSerializer(
 90                instance=self.source,
 91                data={
 92                    "name": "foo",
 93                    "slug": "bar",
 94                    "provider_type": "openidconnect",
 95                    "consumer_key": "foo",
 96                    "consumer_secret": "foo",
 97                    "oidc_well_known_url": url,
 98                    "oidc_jwks_url": "",
 99                },
100            )
101            self.assertTrue(serializer.is_valid())
102            self.assertEqual(
103                serializer.validated_data["authorization_url"], "http://mock/oauth/authorize"
104            )
105            self.assertEqual(
106                serializer.validated_data["access_token_url"], "http://mock/oauth/token"
107            )
108            self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo")
109            self.assertEqual(
110                serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys"
111            )
112            self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config)
113
114    def test_api_validate_openid_connect_invalid(self):
115        """Test API validation (with OIDC endpoints)"""
116        openid_config = {}
117        with Mocker() as mocker:
118            url = "http://mock/.well-known/openid-configuration"
119            mocker.get(url, json=openid_config)
120            serializer = OAuthSourceSerializer(
121                instance=self.source,
122                data={
123                    "name": "foo",
124                    "slug": "bar",
125                    "provider_type": "openidconnect",
126                    "consumer_key": "foo",
127                    "consumer_secret": "foo",
128                    "authorization_url": "http://foo",
129                    "access_token_url": "http://foo",
130                    "profile_url": "http://foo",
131                    "oidc_well_known_url": url,
132                    "oidc_jwks_url": "",
133                },
134            )
135            self.assertFalse(serializer.is_valid())
136
137    def test_source_redirect_login_hint_user(self):
138        """test redirect view with login hint"""
139        user = User(email="foo@authentik.company")
140        session = self.client.session
141        plan = FlowPlan(generate_id())
142        plan.context[PLAN_CONTEXT_PENDING_USER] = user
143        session[SESSION_KEY_PLAN] = plan
144        session.save()
145
146        res = self.client.get(
147            reverse(
148                "authentik_sources_oauth:oauth-client-login",
149                kwargs={"source_slug": self.source.slug},
150            )
151        )
152        self.assertEqual(res.status_code, 302)
153        qs = parse_qs(res.url)
154        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
155
156    def test_source_redirect_login_hint_user_identifier(self):
157        """test redirect view with login hint"""
158        session = self.client.session
159        plan = FlowPlan(generate_id())
160        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company"
161        session[SESSION_KEY_PLAN] = plan
162        session.save()
163
164        res = self.client.get(
165            reverse(
166                "authentik_sources_oauth:oauth-client-login",
167                kwargs={"source_slug": self.source.slug},
168            )
169        )
170        self.assertEqual(res.status_code, 302)
171        qs = parse_qs(res.url)
172        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])
173
174    def test_source_redirect(self):
175        """test redirect view"""
176        res = self.client.get(
177            reverse(
178                "authentik_sources_oauth:oauth-client-login",
179                kwargs={"source_slug": self.source.slug},
180            )
181        )
182        self.assertEqual(res.status_code, 302)
183        qs = parse_qs(res.url)
184
185        session = self.client.session
186        state = session[f"oauth-client-{self.source.name}-request-state"]
187
188        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
189        self.assertEqual(qs["response_type"], ["code"])
190        self.assertEqual(qs["state"], [state])
191        self.assertEqual(qs["scope"], ["email openid profile"])
192
193    def test_source_redirect_pkce(self):
194        """test redirect view"""
195        self.source.pkce = PKCEMethod.S256
196        self.source.save()
197        res = self.client.get(
198            reverse(
199                "authentik_sources_oauth:oauth-client-login",
200                kwargs={"source_slug": self.source.slug},
201            )
202        )
203        self.assertEqual(res.status_code, 302)
204        qs = parse_qs(res.url)
205
206        session = self.client.session
207        state = session[f"oauth-client-{self.source.name}-request-state"]
208        verifier = session[SESSION_KEY_OAUTH_PKCE]
209        self.assertEqual(len(verifier), 128)
210        challenge = pkce_s256_challenge(verifier)
211
212        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
213        self.assertEqual(qs["response_type"], ["code"])
214        self.assertEqual(qs["state"], [state])
215        self.assertEqual(qs["scope"], ["email openid profile"])
216        self.assertEqual(qs["code_challenge"], [challenge])
217        self.assertEqual(qs["code_challenge_method"], ["S256"])
218
219    def test_source_callback(self):
220        """test callback view"""
221        res = self.client.get(
222            reverse(
223                "authentik_sources_oauth:oauth-client-callback",
224                kwargs={"source_slug": self.source.slug},
225            )
226        )
227        self.assertEqual(res.status_code, 302)

OAuth Source tests

def setUp(self):
25    def setUp(self):
26        self.source = OAuthSource.objects.create(
27            name="test",
28            slug="test",
29            provider_type="openidconnect",
30            authorization_url="",
31            profile_url="",
32            consumer_key="",
33        )

Hook method for setting up the test fixture before exercising it.

def test_api_read(self):
35    def test_api_read(self):
36        """Test reading a source"""
37        self.client.force_login(create_test_admin_user())
38        response = self.client.get(
39            reverse(
40                "authentik_api:oauthsource-detail",
41                kwargs={
42                    "slug": self.source.slug,
43                },
44            )
45        )
46        self.assertEqual(response.status_code, 200)

Test reading a source

def test_api_validate(self):
48    def test_api_validate(self):
49        """Test API validation"""
50        self.assertTrue(
51            OAuthSourceSerializer(
52                data={
53                    "name": "foo",
54                    "slug": "bar",
55                    "provider_type": "google",
56                    "consumer_key": "foo",
57                    "consumer_secret": "foo",
58                    "oidc_well_known_url": "",
59                    "oidc_jwks_url": "",
60                }
61            ).is_valid()
62        )
63        self.assertFalse(
64            OAuthSourceSerializer(
65                data={
66                    "name": "foo",
67                    "slug": "bar",
68                    "provider_type": "openidconnect",
69                    "consumer_key": "foo",
70                    "consumer_secret": "foo",
71                }
72            ).is_valid()
73        )

Test API validation

def test_api_validate_openid_connect(self):
 75    def test_api_validate_openid_connect(self):
 76        """Test API validation (with OIDC endpoints)"""
 77        openid_config = {
 78            "issuer": "foo",
 79            "authorization_endpoint": "http://mock/oauth/authorize",
 80            "token_endpoint": "http://mock/oauth/token",
 81            "userinfo_endpoint": "http://mock/oauth/userinfo",
 82            "jwks_uri": "http://mock/oauth/discovery/keys",
 83        }
 84        jwks_config = {"keys": []}
 85        with Mocker() as mocker:
 86            url = "http://mock/.well-known/openid-configuration"
 87            mocker.get(url, json=openid_config)
 88            mocker.get(openid_config["jwks_uri"], json=jwks_config)
 89            serializer = OAuthSourceSerializer(
 90                instance=self.source,
 91                data={
 92                    "name": "foo",
 93                    "slug": "bar",
 94                    "provider_type": "openidconnect",
 95                    "consumer_key": "foo",
 96                    "consumer_secret": "foo",
 97                    "oidc_well_known_url": url,
 98                    "oidc_jwks_url": "",
 99                },
100            )
101            self.assertTrue(serializer.is_valid())
102            self.assertEqual(
103                serializer.validated_data["authorization_url"], "http://mock/oauth/authorize"
104            )
105            self.assertEqual(
106                serializer.validated_data["access_token_url"], "http://mock/oauth/token"
107            )
108            self.assertEqual(serializer.validated_data["profile_url"], "http://mock/oauth/userinfo")
109            self.assertEqual(
110                serializer.validated_data["oidc_jwks_url"], "http://mock/oauth/discovery/keys"
111            )
112            self.assertEqual(serializer.validated_data["oidc_jwks"], jwks_config)

Test API validation (with OIDC endpoints)

def test_api_validate_openid_connect_invalid(self):
114    def test_api_validate_openid_connect_invalid(self):
115        """Test API validation (with OIDC endpoints)"""
116        openid_config = {}
117        with Mocker() as mocker:
118            url = "http://mock/.well-known/openid-configuration"
119            mocker.get(url, json=openid_config)
120            serializer = OAuthSourceSerializer(
121                instance=self.source,
122                data={
123                    "name": "foo",
124                    "slug": "bar",
125                    "provider_type": "openidconnect",
126                    "consumer_key": "foo",
127                    "consumer_secret": "foo",
128                    "authorization_url": "http://foo",
129                    "access_token_url": "http://foo",
130                    "profile_url": "http://foo",
131                    "oidc_well_known_url": url,
132                    "oidc_jwks_url": "",
133                },
134            )
135            self.assertFalse(serializer.is_valid())

Test API validation (with OIDC endpoints)

def test_source_redirect_login_hint_user(self):
137    def test_source_redirect_login_hint_user(self):
138        """test redirect view with login hint"""
139        user = User(email="foo@authentik.company")
140        session = self.client.session
141        plan = FlowPlan(generate_id())
142        plan.context[PLAN_CONTEXT_PENDING_USER] = user
143        session[SESSION_KEY_PLAN] = plan
144        session.save()
145
146        res = self.client.get(
147            reverse(
148                "authentik_sources_oauth:oauth-client-login",
149                kwargs={"source_slug": self.source.slug},
150            )
151        )
152        self.assertEqual(res.status_code, 302)
153        qs = parse_qs(res.url)
154        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])

test redirect view with login hint

def test_source_redirect_login_hint_user_identifier(self):
156    def test_source_redirect_login_hint_user_identifier(self):
157        """test redirect view with login hint"""
158        session = self.client.session
159        plan = FlowPlan(generate_id())
160        plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = "foo@authentik.company"
161        session[SESSION_KEY_PLAN] = plan
162        session.save()
163
164        res = self.client.get(
165            reverse(
166                "authentik_sources_oauth:oauth-client-login",
167                kwargs={"source_slug": self.source.slug},
168            )
169        )
170        self.assertEqual(res.status_code, 302)
171        qs = parse_qs(res.url)
172        self.assertEqual(qs["login_hint"], ["foo@authentik.company"])

test redirect view with login hint

def test_source_redirect(self):
174    def test_source_redirect(self):
175        """test redirect view"""
176        res = self.client.get(
177            reverse(
178                "authentik_sources_oauth:oauth-client-login",
179                kwargs={"source_slug": self.source.slug},
180            )
181        )
182        self.assertEqual(res.status_code, 302)
183        qs = parse_qs(res.url)
184
185        session = self.client.session
186        state = session[f"oauth-client-{self.source.name}-request-state"]
187
188        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
189        self.assertEqual(qs["response_type"], ["code"])
190        self.assertEqual(qs["state"], [state])
191        self.assertEqual(qs["scope"], ["email openid profile"])

test redirect view

def test_source_redirect_pkce(self):
193    def test_source_redirect_pkce(self):
194        """test redirect view"""
195        self.source.pkce = PKCEMethod.S256
196        self.source.save()
197        res = self.client.get(
198            reverse(
199                "authentik_sources_oauth:oauth-client-login",
200                kwargs={"source_slug": self.source.slug},
201            )
202        )
203        self.assertEqual(res.status_code, 302)
204        qs = parse_qs(res.url)
205
206        session = self.client.session
207        state = session[f"oauth-client-{self.source.name}-request-state"]
208        verifier = session[SESSION_KEY_OAUTH_PKCE]
209        self.assertEqual(len(verifier), 128)
210        challenge = pkce_s256_challenge(verifier)
211
212        self.assertEqual(qs["redirect_uri"], ["http://testserver/source/oauth/callback/test/"])
213        self.assertEqual(qs["response_type"], ["code"])
214        self.assertEqual(qs["state"], [state])
215        self.assertEqual(qs["scope"], ["email openid profile"])
216        self.assertEqual(qs["code_challenge"], [challenge])
217        self.assertEqual(qs["code_challenge_method"], ["S256"])

test redirect view

def test_source_callback(self):
219    def test_source_callback(self):
220        """test callback view"""
221        res = self.client.get(
222            reverse(
223                "authentik_sources_oauth:oauth-client-callback",
224                kwargs={"source_slug": self.source.slug},
225            )
226        )
227        self.assertEqual(res.status_code, 302)

test callback view