authentik.providers.proxy.tests

proxy provider tests

  1"""proxy provider tests"""
  2
  3from json import loads
  4
  5from django.urls import reverse
  6from rest_framework.test import APITestCase
  7
  8from authentik.core.models import Application
  9from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow
 10from authentik.lib.generators import generate_id
 11from authentik.outposts.models import Outpost, OutpostType
 12from authentik.providers.oauth2.models import ClientTypes
 13from authentik.providers.proxy.models import ProxyMode, ProxyProvider
 14
 15
 16class ProxyProviderTests(APITestCase):
 17    """proxy provider tests"""
 18
 19    def setUp(self) -> None:
 20        self.user = create_test_admin_user()
 21        self.client.force_login(self.user)
 22
 23    def test_basic_auth(self):
 24        """Test basic_auth_enabled"""
 25        response = self.client.post(
 26            reverse("authentik_api:proxyprovider-list"),
 27            {
 28                "name": generate_id(),
 29                "mode": ProxyMode.PROXY,
 30                "authorization_flow": create_test_flow().pk.hex,
 31                "invalidation_flow": create_test_flow().pk.hex,
 32                "external_host": "http://localhost",
 33                "internal_host": "http://localhost",
 34                "basic_auth_enabled": True,
 35                "basic_auth_user_attribute": generate_id(),
 36                "basic_auth_password_attribute": generate_id(),
 37            },
 38        )
 39        self.assertEqual(response.status_code, 201)
 40
 41    def test_basic_auth_invalid(self):
 42        """Test basic_auth_enabled"""
 43        response = self.client.post(
 44            reverse("authentik_api:proxyprovider-list"),
 45            {
 46                "name": generate_id(),
 47                "mode": ProxyMode.PROXY,
 48                "authorization_flow": create_test_flow().pk.hex,
 49                "invalidation_flow": create_test_flow().pk.hex,
 50                "external_host": "http://localhost",
 51                "internal_host": "http://localhost",
 52                "basic_auth_enabled": True,
 53            },
 54        )
 55        self.assertEqual(response.status_code, 400)
 56        self.assertJSONEqual(
 57            response.content.decode(),
 58            {
 59                "basic_auth_enabled": [
 60                    "User and password attributes must be set when basic auth is enabled."
 61                ]
 62            },
 63        )
 64
 65    def test_validate(self):
 66        """Test validate"""
 67        response = self.client.post(
 68            reverse("authentik_api:proxyprovider-list"),
 69            {
 70                "name": generate_id(),
 71                "mode": ProxyMode.PROXY,
 72                "authorization_flow": create_test_flow().pk.hex,
 73                "invalidation_flow": create_test_flow().pk.hex,
 74                "external_host": "http://localhost",
 75            },
 76        )
 77        self.assertEqual(response.status_code, 400)
 78        self.assertJSONEqual(
 79            response.content.decode(),
 80            {"internal_host": ["Internal host cannot be empty when forward auth is disabled."]},
 81        )
 82
 83    def test_create_defaults(self):
 84        """Test create"""
 85        name = generate_id()
 86        response = self.client.post(
 87            reverse("authentik_api:proxyprovider-list"),
 88            {
 89                "name": name,
 90                "mode": ProxyMode.PROXY,
 91                "authorization_flow": create_test_flow().pk.hex,
 92                "invalidation_flow": create_test_flow().pk.hex,
 93                "external_host": "http://localhost",
 94                "internal_host": "http://localhost",
 95            },
 96        )
 97        self.assertEqual(response.status_code, 201)
 98        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
 99        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
100
101    def test_update_defaults(self):
102        """Test create"""
103        name = generate_id()
104        response = self.client.post(
105            reverse("authentik_api:proxyprovider-list"),
106            {
107                "name": name,
108                "mode": ProxyMode.PROXY,
109                "authorization_flow": create_test_flow().pk.hex,
110                "invalidation_flow": create_test_flow().pk.hex,
111                "external_host": "http://localhost",
112                "internal_host": "http://localhost",
113            },
114        )
115        self.assertEqual(response.status_code, 201)
116        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
117        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
118        provider.client_type = ClientTypes.PUBLIC
119        provider.save()
120        response = self.client.put(
121            reverse("authentik_api:proxyprovider-detail", kwargs={"pk": provider.pk}),
122            {
123                "name": name,
124                "mode": ProxyMode.PROXY,
125                "authorization_flow": create_test_flow().pk.hex,
126                "invalidation_flow": create_test_flow().pk.hex,
127                "external_host": "http://localhost",
128                "internal_host": "http://localhost",
129            },
130        )
131        self.assertEqual(response.status_code, 200)
132        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
133        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
134
135    def test_sa_fetch(self):
136        """Test fetching the outpost config as the service account"""
137        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
138        provider = ProxyProvider.objects.create(name=generate_id())
139        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
140        outpost.providers.add(provider)
141
142        res = self.client.get(
143            reverse("authentik_api:proxyprovideroutpost-list"),
144            HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
145        )
146        body = loads(res.content)
147        self.assertEqual(body["pagination"]["count"], 1)
148
149    def test_sa_perms_cert(self):
150        """Test permissions to access a configured certificate"""
151        cert = create_test_cert()
152        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
153        provider = ProxyProvider.objects.create(name=generate_id(), certificate=cert)
154        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
155        outpost.providers.add(provider)
156
157        res = self.client.get(
158            reverse("authentik_api:proxyprovideroutpost-list"),
159            HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
160        )
161        body = loads(res.content)
162        self.assertEqual(body["pagination"]["count"], 1)
163        cert_id = body["results"][0]["certificate"]
164        self.assertEqual(cert_id, str(cert.pk))
165
166        res = self.client.get(
167            reverse(
168                "authentik_api:certificatekeypair-view-certificate",
169                kwargs={
170                    "pk": cert_id,
171                },
172            ),
173            HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
174        )
175        self.assertEqual(res.status_code, 200)
176        # res = self.client.get(
177        #     reverse(
178        #         "authentik_api:certificatekeypair-view-private-key",
179        #         kwargs={
180        #             "pk": cert_id,
181        #         },
182        #     ),
183        #     HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
184        # )
185        # self.assertEqual(res.status_code, 200)
class ProxyProviderTests(rest_framework.test.APITestCase):
 17class ProxyProviderTests(APITestCase):
 18    """proxy provider tests"""
 19
 20    def setUp(self) -> None:
 21        self.user = create_test_admin_user()
 22        self.client.force_login(self.user)
 23
 24    def test_basic_auth(self):
 25        """Test basic_auth_enabled"""
 26        response = self.client.post(
 27            reverse("authentik_api:proxyprovider-list"),
 28            {
 29                "name": generate_id(),
 30                "mode": ProxyMode.PROXY,
 31                "authorization_flow": create_test_flow().pk.hex,
 32                "invalidation_flow": create_test_flow().pk.hex,
 33                "external_host": "http://localhost",
 34                "internal_host": "http://localhost",
 35                "basic_auth_enabled": True,
 36                "basic_auth_user_attribute": generate_id(),
 37                "basic_auth_password_attribute": generate_id(),
 38            },
 39        )
 40        self.assertEqual(response.status_code, 201)
 41
 42    def test_basic_auth_invalid(self):
 43        """Test basic_auth_enabled"""
 44        response = self.client.post(
 45            reverse("authentik_api:proxyprovider-list"),
 46            {
 47                "name": generate_id(),
 48                "mode": ProxyMode.PROXY,
 49                "authorization_flow": create_test_flow().pk.hex,
 50                "invalidation_flow": create_test_flow().pk.hex,
 51                "external_host": "http://localhost",
 52                "internal_host": "http://localhost",
 53                "basic_auth_enabled": True,
 54            },
 55        )
 56        self.assertEqual(response.status_code, 400)
 57        self.assertJSONEqual(
 58            response.content.decode(),
 59            {
 60                "basic_auth_enabled": [
 61                    "User and password attributes must be set when basic auth is enabled."
 62                ]
 63            },
 64        )
 65
 66    def test_validate(self):
 67        """Test validate"""
 68        response = self.client.post(
 69            reverse("authentik_api:proxyprovider-list"),
 70            {
 71                "name": generate_id(),
 72                "mode": ProxyMode.PROXY,
 73                "authorization_flow": create_test_flow().pk.hex,
 74                "invalidation_flow": create_test_flow().pk.hex,
 75                "external_host": "http://localhost",
 76            },
 77        )
 78        self.assertEqual(response.status_code, 400)
 79        self.assertJSONEqual(
 80            response.content.decode(),
 81            {"internal_host": ["Internal host cannot be empty when forward auth is disabled."]},
 82        )
 83
 84    def test_create_defaults(self):
 85        """Test create"""
 86        name = generate_id()
 87        response = self.client.post(
 88            reverse("authentik_api:proxyprovider-list"),
 89            {
 90                "name": name,
 91                "mode": ProxyMode.PROXY,
 92                "authorization_flow": create_test_flow().pk.hex,
 93                "invalidation_flow": create_test_flow().pk.hex,
 94                "external_host": "http://localhost",
 95                "internal_host": "http://localhost",
 96            },
 97        )
 98        self.assertEqual(response.status_code, 201)
 99        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
100        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
101
102    def test_update_defaults(self):
103        """Test create"""
104        name = generate_id()
105        response = self.client.post(
106            reverse("authentik_api:proxyprovider-list"),
107            {
108                "name": name,
109                "mode": ProxyMode.PROXY,
110                "authorization_flow": create_test_flow().pk.hex,
111                "invalidation_flow": create_test_flow().pk.hex,
112                "external_host": "http://localhost",
113                "internal_host": "http://localhost",
114            },
115        )
116        self.assertEqual(response.status_code, 201)
117        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
118        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
119        provider.client_type = ClientTypes.PUBLIC
120        provider.save()
121        response = self.client.put(
122            reverse("authentik_api:proxyprovider-detail", kwargs={"pk": provider.pk}),
123            {
124                "name": name,
125                "mode": ProxyMode.PROXY,
126                "authorization_flow": create_test_flow().pk.hex,
127                "invalidation_flow": create_test_flow().pk.hex,
128                "external_host": "http://localhost",
129                "internal_host": "http://localhost",
130            },
131        )
132        self.assertEqual(response.status_code, 200)
133        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
134        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
135
136    def test_sa_fetch(self):
137        """Test fetching the outpost config as the service account"""
138        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
139        provider = ProxyProvider.objects.create(name=generate_id())
140        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
141        outpost.providers.add(provider)
142
143        res = self.client.get(
144            reverse("authentik_api:proxyprovideroutpost-list"),
145            HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
146        )
147        body = loads(res.content)
148        self.assertEqual(body["pagination"]["count"], 1)
149
150    def test_sa_perms_cert(self):
151        """Test permissions to access a configured certificate"""
152        cert = create_test_cert()
153        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
154        provider = ProxyProvider.objects.create(name=generate_id(), certificate=cert)
155        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
156        outpost.providers.add(provider)
157
158        res = self.client.get(
159            reverse("authentik_api:proxyprovideroutpost-list"),
160            HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
161        )
162        body = loads(res.content)
163        self.assertEqual(body["pagination"]["count"], 1)
164        cert_id = body["results"][0]["certificate"]
165        self.assertEqual(cert_id, str(cert.pk))
166
167        res = self.client.get(
168            reverse(
169                "authentik_api:certificatekeypair-view-certificate",
170                kwargs={
171                    "pk": cert_id,
172                },
173            ),
174            HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
175        )
176        self.assertEqual(res.status_code, 200)
177        # res = self.client.get(
178        #     reverse(
179        #         "authentik_api:certificatekeypair-view-private-key",
180        #         kwargs={
181        #             "pk": cert_id,
182        #         },
183        #     ),
184        #     HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
185        # )
186        # self.assertEqual(res.status_code, 200)

proxy provider tests

def setUp(self) -> None:
20    def setUp(self) -> None:
21        self.user = create_test_admin_user()
22        self.client.force_login(self.user)

Hook method for setting up the test fixture before exercising it.

def test_basic_auth(self):
24    def test_basic_auth(self):
25        """Test basic_auth_enabled"""
26        response = self.client.post(
27            reverse("authentik_api:proxyprovider-list"),
28            {
29                "name": generate_id(),
30                "mode": ProxyMode.PROXY,
31                "authorization_flow": create_test_flow().pk.hex,
32                "invalidation_flow": create_test_flow().pk.hex,
33                "external_host": "http://localhost",
34                "internal_host": "http://localhost",
35                "basic_auth_enabled": True,
36                "basic_auth_user_attribute": generate_id(),
37                "basic_auth_password_attribute": generate_id(),
38            },
39        )
40        self.assertEqual(response.status_code, 201)

Test basic_auth_enabled

def test_basic_auth_invalid(self):
42    def test_basic_auth_invalid(self):
43        """Test basic_auth_enabled"""
44        response = self.client.post(
45            reverse("authentik_api:proxyprovider-list"),
46            {
47                "name": generate_id(),
48                "mode": ProxyMode.PROXY,
49                "authorization_flow": create_test_flow().pk.hex,
50                "invalidation_flow": create_test_flow().pk.hex,
51                "external_host": "http://localhost",
52                "internal_host": "http://localhost",
53                "basic_auth_enabled": True,
54            },
55        )
56        self.assertEqual(response.status_code, 400)
57        self.assertJSONEqual(
58            response.content.decode(),
59            {
60                "basic_auth_enabled": [
61                    "User and password attributes must be set when basic auth is enabled."
62                ]
63            },
64        )

Test basic_auth_enabled

def test_validate(self):
66    def test_validate(self):
67        """Test validate"""
68        response = self.client.post(
69            reverse("authentik_api:proxyprovider-list"),
70            {
71                "name": generate_id(),
72                "mode": ProxyMode.PROXY,
73                "authorization_flow": create_test_flow().pk.hex,
74                "invalidation_flow": create_test_flow().pk.hex,
75                "external_host": "http://localhost",
76            },
77        )
78        self.assertEqual(response.status_code, 400)
79        self.assertJSONEqual(
80            response.content.decode(),
81            {"internal_host": ["Internal host cannot be empty when forward auth is disabled."]},
82        )

Test validate

def test_create_defaults(self):
 84    def test_create_defaults(self):
 85        """Test create"""
 86        name = generate_id()
 87        response = self.client.post(
 88            reverse("authentik_api:proxyprovider-list"),
 89            {
 90                "name": name,
 91                "mode": ProxyMode.PROXY,
 92                "authorization_flow": create_test_flow().pk.hex,
 93                "invalidation_flow": create_test_flow().pk.hex,
 94                "external_host": "http://localhost",
 95                "internal_host": "http://localhost",
 96            },
 97        )
 98        self.assertEqual(response.status_code, 201)
 99        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
100        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)

Test create

def test_update_defaults(self):
102    def test_update_defaults(self):
103        """Test create"""
104        name = generate_id()
105        response = self.client.post(
106            reverse("authentik_api:proxyprovider-list"),
107            {
108                "name": name,
109                "mode": ProxyMode.PROXY,
110                "authorization_flow": create_test_flow().pk.hex,
111                "invalidation_flow": create_test_flow().pk.hex,
112                "external_host": "http://localhost",
113                "internal_host": "http://localhost",
114            },
115        )
116        self.assertEqual(response.status_code, 201)
117        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
118        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
119        provider.client_type = ClientTypes.PUBLIC
120        provider.save()
121        response = self.client.put(
122            reverse("authentik_api:proxyprovider-detail", kwargs={"pk": provider.pk}),
123            {
124                "name": name,
125                "mode": ProxyMode.PROXY,
126                "authorization_flow": create_test_flow().pk.hex,
127                "invalidation_flow": create_test_flow().pk.hex,
128                "external_host": "http://localhost",
129                "internal_host": "http://localhost",
130            },
131        )
132        self.assertEqual(response.status_code, 200)
133        provider: ProxyProvider = ProxyProvider.objects.get(name=name)
134        self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)

Test create

def test_sa_fetch(self):
136    def test_sa_fetch(self):
137        """Test fetching the outpost config as the service account"""
138        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
139        provider = ProxyProvider.objects.create(name=generate_id())
140        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
141        outpost.providers.add(provider)
142
143        res = self.client.get(
144            reverse("authentik_api:proxyprovideroutpost-list"),
145            HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
146        )
147        body = loads(res.content)
148        self.assertEqual(body["pagination"]["count"], 1)

Test fetching the outpost config as the service account

def test_sa_perms_cert(self):
150    def test_sa_perms_cert(self):
151        """Test permissions to access a configured certificate"""
152        cert = create_test_cert()
153        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
154        provider = ProxyProvider.objects.create(name=generate_id(), certificate=cert)
155        Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
156        outpost.providers.add(provider)
157
158        res = self.client.get(
159            reverse("authentik_api:proxyprovideroutpost-list"),
160            HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
161        )
162        body = loads(res.content)
163        self.assertEqual(body["pagination"]["count"], 1)
164        cert_id = body["results"][0]["certificate"]
165        self.assertEqual(cert_id, str(cert.pk))
166
167        res = self.client.get(
168            reverse(
169                "authentik_api:certificatekeypair-view-certificate",
170                kwargs={
171                    "pk": cert_id,
172                },
173            ),
174            HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
175        )
176        self.assertEqual(res.status_code, 200)
177        # res = self.client.get(
178        #     reverse(
179        #         "authentik_api:certificatekeypair-view-private-key",
180        #         kwargs={
181        #             "pk": cert_id,
182        #         },
183        #     ),
184        #     HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
185        # )
186        # self.assertEqual(res.status_code, 200)

Test permissions to access a configured certificate