authentik.providers.proxy.tests
proxy provider tests
1"""proxy provider tests""" 2 3from json import loads 4 5from django.urls import reverse 6from rest_framework.test import APITestCase 7 8from authentik.core.models import Application 9from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow 10from authentik.lib.generators import generate_id 11from authentik.outposts.models import Outpost, OutpostType 12from authentik.providers.oauth2.models import ClientTypes 13from authentik.providers.proxy.models import ProxyMode, ProxyProvider 14 15 16class ProxyProviderTests(APITestCase): 17 """proxy provider tests""" 18 19 def setUp(self) -> None: 20 self.user = create_test_admin_user() 21 self.client.force_login(self.user) 22 23 def test_basic_auth(self): 24 """Test basic_auth_enabled""" 25 response = self.client.post( 26 reverse("authentik_api:proxyprovider-list"), 27 { 28 "name": generate_id(), 29 "mode": ProxyMode.PROXY, 30 "authorization_flow": create_test_flow().pk.hex, 31 "invalidation_flow": create_test_flow().pk.hex, 32 "external_host": "http://localhost", 33 "internal_host": "http://localhost", 34 "basic_auth_enabled": True, 35 "basic_auth_user_attribute": generate_id(), 36 "basic_auth_password_attribute": generate_id(), 37 }, 38 ) 39 self.assertEqual(response.status_code, 201) 40 41 def test_basic_auth_invalid(self): 42 """Test basic_auth_enabled""" 43 response = self.client.post( 44 reverse("authentik_api:proxyprovider-list"), 45 { 46 "name": generate_id(), 47 "mode": ProxyMode.PROXY, 48 "authorization_flow": create_test_flow().pk.hex, 49 "invalidation_flow": create_test_flow().pk.hex, 50 "external_host": "http://localhost", 51 "internal_host": "http://localhost", 52 "basic_auth_enabled": True, 53 }, 54 ) 55 self.assertEqual(response.status_code, 400) 56 self.assertJSONEqual( 57 response.content.decode(), 58 { 59 "basic_auth_enabled": [ 60 "User and password attributes must be set when basic auth is enabled." 61 ] 62 }, 63 ) 64 65 def test_validate(self): 66 """Test validate""" 67 response = self.client.post( 68 reverse("authentik_api:proxyprovider-list"), 69 { 70 "name": generate_id(), 71 "mode": ProxyMode.PROXY, 72 "authorization_flow": create_test_flow().pk.hex, 73 "invalidation_flow": create_test_flow().pk.hex, 74 "external_host": "http://localhost", 75 }, 76 ) 77 self.assertEqual(response.status_code, 400) 78 self.assertJSONEqual( 79 response.content.decode(), 80 {"internal_host": ["Internal host cannot be empty when forward auth is disabled."]}, 81 ) 82 83 def test_create_defaults(self): 84 """Test create""" 85 name = generate_id() 86 response = self.client.post( 87 reverse("authentik_api:proxyprovider-list"), 88 { 89 "name": name, 90 "mode": ProxyMode.PROXY, 91 "authorization_flow": create_test_flow().pk.hex, 92 "invalidation_flow": create_test_flow().pk.hex, 93 "external_host": "http://localhost", 94 "internal_host": "http://localhost", 95 }, 96 ) 97 self.assertEqual(response.status_code, 201) 98 provider: ProxyProvider = ProxyProvider.objects.get(name=name) 99 self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL) 100 101 def test_update_defaults(self): 102 """Test create""" 103 name = generate_id() 104 response = self.client.post( 105 reverse("authentik_api:proxyprovider-list"), 106 { 107 "name": name, 108 "mode": ProxyMode.PROXY, 109 "authorization_flow": create_test_flow().pk.hex, 110 "invalidation_flow": create_test_flow().pk.hex, 111 "external_host": "http://localhost", 112 "internal_host": "http://localhost", 113 }, 114 ) 115 self.assertEqual(response.status_code, 201) 116 provider: ProxyProvider = ProxyProvider.objects.get(name=name) 117 self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL) 118 provider.client_type = ClientTypes.PUBLIC 119 provider.save() 120 response = self.client.put( 121 reverse("authentik_api:proxyprovider-detail", kwargs={"pk": provider.pk}), 122 { 123 "name": name, 124 "mode": ProxyMode.PROXY, 125 "authorization_flow": create_test_flow().pk.hex, 126 "invalidation_flow": create_test_flow().pk.hex, 127 "external_host": "http://localhost", 128 "internal_host": "http://localhost", 129 }, 130 ) 131 self.assertEqual(response.status_code, 200) 132 provider: ProxyProvider = ProxyProvider.objects.get(name=name) 133 self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL) 134 135 def test_sa_fetch(self): 136 """Test fetching the outpost config as the service account""" 137 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 138 provider = ProxyProvider.objects.create(name=generate_id()) 139 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 140 outpost.providers.add(provider) 141 142 res = self.client.get( 143 reverse("authentik_api:proxyprovideroutpost-list"), 144 HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 145 ) 146 body = loads(res.content) 147 self.assertEqual(body["pagination"]["count"], 1) 148 149 def test_sa_perms_cert(self): 150 """Test permissions to access a configured certificate""" 151 cert = create_test_cert() 152 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 153 provider = ProxyProvider.objects.create(name=generate_id(), certificate=cert) 154 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 155 outpost.providers.add(provider) 156 157 res = self.client.get( 158 reverse("authentik_api:proxyprovideroutpost-list"), 159 HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 160 ) 161 body = loads(res.content) 162 self.assertEqual(body["pagination"]["count"], 1) 163 cert_id = body["results"][0]["certificate"] 164 self.assertEqual(cert_id, str(cert.pk)) 165 166 res = self.client.get( 167 reverse( 168 "authentik_api:certificatekeypair-view-certificate", 169 kwargs={ 170 "pk": cert_id, 171 }, 172 ), 173 HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 174 ) 175 self.assertEqual(res.status_code, 200) 176 # res = self.client.get( 177 # reverse( 178 # "authentik_api:certificatekeypair-view-private-key", 179 # kwargs={ 180 # "pk": cert_id, 181 # }, 182 # ), 183 # HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 184 # ) 185 # self.assertEqual(res.status_code, 200)
class
ProxyProviderTests(rest_framework.test.APITestCase):
17class ProxyProviderTests(APITestCase): 18 """proxy provider tests""" 19 20 def setUp(self) -> None: 21 self.user = create_test_admin_user() 22 self.client.force_login(self.user) 23 24 def test_basic_auth(self): 25 """Test basic_auth_enabled""" 26 response = self.client.post( 27 reverse("authentik_api:proxyprovider-list"), 28 { 29 "name": generate_id(), 30 "mode": ProxyMode.PROXY, 31 "authorization_flow": create_test_flow().pk.hex, 32 "invalidation_flow": create_test_flow().pk.hex, 33 "external_host": "http://localhost", 34 "internal_host": "http://localhost", 35 "basic_auth_enabled": True, 36 "basic_auth_user_attribute": generate_id(), 37 "basic_auth_password_attribute": generate_id(), 38 }, 39 ) 40 self.assertEqual(response.status_code, 201) 41 42 def test_basic_auth_invalid(self): 43 """Test basic_auth_enabled""" 44 response = self.client.post( 45 reverse("authentik_api:proxyprovider-list"), 46 { 47 "name": generate_id(), 48 "mode": ProxyMode.PROXY, 49 "authorization_flow": create_test_flow().pk.hex, 50 "invalidation_flow": create_test_flow().pk.hex, 51 "external_host": "http://localhost", 52 "internal_host": "http://localhost", 53 "basic_auth_enabled": True, 54 }, 55 ) 56 self.assertEqual(response.status_code, 400) 57 self.assertJSONEqual( 58 response.content.decode(), 59 { 60 "basic_auth_enabled": [ 61 "User and password attributes must be set when basic auth is enabled." 62 ] 63 }, 64 ) 65 66 def test_validate(self): 67 """Test validate""" 68 response = self.client.post( 69 reverse("authentik_api:proxyprovider-list"), 70 { 71 "name": generate_id(), 72 "mode": ProxyMode.PROXY, 73 "authorization_flow": create_test_flow().pk.hex, 74 "invalidation_flow": create_test_flow().pk.hex, 75 "external_host": "http://localhost", 76 }, 77 ) 78 self.assertEqual(response.status_code, 400) 79 self.assertJSONEqual( 80 response.content.decode(), 81 {"internal_host": ["Internal host cannot be empty when forward auth is disabled."]}, 82 ) 83 84 def test_create_defaults(self): 85 """Test create""" 86 name = generate_id() 87 response = self.client.post( 88 reverse("authentik_api:proxyprovider-list"), 89 { 90 "name": name, 91 "mode": ProxyMode.PROXY, 92 "authorization_flow": create_test_flow().pk.hex, 93 "invalidation_flow": create_test_flow().pk.hex, 94 "external_host": "http://localhost", 95 "internal_host": "http://localhost", 96 }, 97 ) 98 self.assertEqual(response.status_code, 201) 99 provider: ProxyProvider = ProxyProvider.objects.get(name=name) 100 self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL) 101 102 def test_update_defaults(self): 103 """Test create""" 104 name = generate_id() 105 response = self.client.post( 106 reverse("authentik_api:proxyprovider-list"), 107 { 108 "name": name, 109 "mode": ProxyMode.PROXY, 110 "authorization_flow": create_test_flow().pk.hex, 111 "invalidation_flow": create_test_flow().pk.hex, 112 "external_host": "http://localhost", 113 "internal_host": "http://localhost", 114 }, 115 ) 116 self.assertEqual(response.status_code, 201) 117 provider: ProxyProvider = ProxyProvider.objects.get(name=name) 118 self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL) 119 provider.client_type = ClientTypes.PUBLIC 120 provider.save() 121 response = self.client.put( 122 reverse("authentik_api:proxyprovider-detail", kwargs={"pk": provider.pk}), 123 { 124 "name": name, 125 "mode": ProxyMode.PROXY, 126 "authorization_flow": create_test_flow().pk.hex, 127 "invalidation_flow": create_test_flow().pk.hex, 128 "external_host": "http://localhost", 129 "internal_host": "http://localhost", 130 }, 131 ) 132 self.assertEqual(response.status_code, 200) 133 provider: ProxyProvider = ProxyProvider.objects.get(name=name) 134 self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL) 135 136 def test_sa_fetch(self): 137 """Test fetching the outpost config as the service account""" 138 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 139 provider = ProxyProvider.objects.create(name=generate_id()) 140 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 141 outpost.providers.add(provider) 142 143 res = self.client.get( 144 reverse("authentik_api:proxyprovideroutpost-list"), 145 HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 146 ) 147 body = loads(res.content) 148 self.assertEqual(body["pagination"]["count"], 1) 149 150 def test_sa_perms_cert(self): 151 """Test permissions to access a configured certificate""" 152 cert = create_test_cert() 153 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 154 provider = ProxyProvider.objects.create(name=generate_id(), certificate=cert) 155 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 156 outpost.providers.add(provider) 157 158 res = self.client.get( 159 reverse("authentik_api:proxyprovideroutpost-list"), 160 HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 161 ) 162 body = loads(res.content) 163 self.assertEqual(body["pagination"]["count"], 1) 164 cert_id = body["results"][0]["certificate"] 165 self.assertEqual(cert_id, str(cert.pk)) 166 167 res = self.client.get( 168 reverse( 169 "authentik_api:certificatekeypair-view-certificate", 170 kwargs={ 171 "pk": cert_id, 172 }, 173 ), 174 HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 175 ) 176 self.assertEqual(res.status_code, 200) 177 # res = self.client.get( 178 # reverse( 179 # "authentik_api:certificatekeypair-view-private-key", 180 # kwargs={ 181 # "pk": cert_id, 182 # }, 183 # ), 184 # HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 185 # ) 186 # self.assertEqual(res.status_code, 200)
proxy provider tests
def
setUp(self) -> None:
20 def setUp(self) -> None: 21 self.user = create_test_admin_user() 22 self.client.force_login(self.user)
Hook method for setting up the test fixture before exercising it.
def
test_basic_auth(self):
24 def test_basic_auth(self): 25 """Test basic_auth_enabled""" 26 response = self.client.post( 27 reverse("authentik_api:proxyprovider-list"), 28 { 29 "name": generate_id(), 30 "mode": ProxyMode.PROXY, 31 "authorization_flow": create_test_flow().pk.hex, 32 "invalidation_flow": create_test_flow().pk.hex, 33 "external_host": "http://localhost", 34 "internal_host": "http://localhost", 35 "basic_auth_enabled": True, 36 "basic_auth_user_attribute": generate_id(), 37 "basic_auth_password_attribute": generate_id(), 38 }, 39 ) 40 self.assertEqual(response.status_code, 201)
Test basic_auth_enabled
def
test_basic_auth_invalid(self):
42 def test_basic_auth_invalid(self): 43 """Test basic_auth_enabled""" 44 response = self.client.post( 45 reverse("authentik_api:proxyprovider-list"), 46 { 47 "name": generate_id(), 48 "mode": ProxyMode.PROXY, 49 "authorization_flow": create_test_flow().pk.hex, 50 "invalidation_flow": create_test_flow().pk.hex, 51 "external_host": "http://localhost", 52 "internal_host": "http://localhost", 53 "basic_auth_enabled": True, 54 }, 55 ) 56 self.assertEqual(response.status_code, 400) 57 self.assertJSONEqual( 58 response.content.decode(), 59 { 60 "basic_auth_enabled": [ 61 "User and password attributes must be set when basic auth is enabled." 62 ] 63 }, 64 )
Test basic_auth_enabled
def
test_validate(self):
66 def test_validate(self): 67 """Test validate""" 68 response = self.client.post( 69 reverse("authentik_api:proxyprovider-list"), 70 { 71 "name": generate_id(), 72 "mode": ProxyMode.PROXY, 73 "authorization_flow": create_test_flow().pk.hex, 74 "invalidation_flow": create_test_flow().pk.hex, 75 "external_host": "http://localhost", 76 }, 77 ) 78 self.assertEqual(response.status_code, 400) 79 self.assertJSONEqual( 80 response.content.decode(), 81 {"internal_host": ["Internal host cannot be empty when forward auth is disabled."]}, 82 )
Test validate
def
test_create_defaults(self):
84 def test_create_defaults(self): 85 """Test create""" 86 name = generate_id() 87 response = self.client.post( 88 reverse("authentik_api:proxyprovider-list"), 89 { 90 "name": name, 91 "mode": ProxyMode.PROXY, 92 "authorization_flow": create_test_flow().pk.hex, 93 "invalidation_flow": create_test_flow().pk.hex, 94 "external_host": "http://localhost", 95 "internal_host": "http://localhost", 96 }, 97 ) 98 self.assertEqual(response.status_code, 201) 99 provider: ProxyProvider = ProxyProvider.objects.get(name=name) 100 self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
Test create
def
test_update_defaults(self):
102 def test_update_defaults(self): 103 """Test create""" 104 name = generate_id() 105 response = self.client.post( 106 reverse("authentik_api:proxyprovider-list"), 107 { 108 "name": name, 109 "mode": ProxyMode.PROXY, 110 "authorization_flow": create_test_flow().pk.hex, 111 "invalidation_flow": create_test_flow().pk.hex, 112 "external_host": "http://localhost", 113 "internal_host": "http://localhost", 114 }, 115 ) 116 self.assertEqual(response.status_code, 201) 117 provider: ProxyProvider = ProxyProvider.objects.get(name=name) 118 self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL) 119 provider.client_type = ClientTypes.PUBLIC 120 provider.save() 121 response = self.client.put( 122 reverse("authentik_api:proxyprovider-detail", kwargs={"pk": provider.pk}), 123 { 124 "name": name, 125 "mode": ProxyMode.PROXY, 126 "authorization_flow": create_test_flow().pk.hex, 127 "invalidation_flow": create_test_flow().pk.hex, 128 "external_host": "http://localhost", 129 "internal_host": "http://localhost", 130 }, 131 ) 132 self.assertEqual(response.status_code, 200) 133 provider: ProxyProvider = ProxyProvider.objects.get(name=name) 134 self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
Test create
def
test_sa_fetch(self):
136 def test_sa_fetch(self): 137 """Test fetching the outpost config as the service account""" 138 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 139 provider = ProxyProvider.objects.create(name=generate_id()) 140 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 141 outpost.providers.add(provider) 142 143 res = self.client.get( 144 reverse("authentik_api:proxyprovideroutpost-list"), 145 HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 146 ) 147 body = loads(res.content) 148 self.assertEqual(body["pagination"]["count"], 1)
Test fetching the outpost config as the service account
def
test_sa_perms_cert(self):
150 def test_sa_perms_cert(self): 151 """Test permissions to access a configured certificate""" 152 cert = create_test_cert() 153 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 154 provider = ProxyProvider.objects.create(name=generate_id(), certificate=cert) 155 Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) 156 outpost.providers.add(provider) 157 158 res = self.client.get( 159 reverse("authentik_api:proxyprovideroutpost-list"), 160 HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 161 ) 162 body = loads(res.content) 163 self.assertEqual(body["pagination"]["count"], 1) 164 cert_id = body["results"][0]["certificate"] 165 self.assertEqual(cert_id, str(cert.pk)) 166 167 res = self.client.get( 168 reverse( 169 "authentik_api:certificatekeypair-view-certificate", 170 kwargs={ 171 "pk": cert_id, 172 }, 173 ), 174 HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 175 ) 176 self.assertEqual(res.status_code, 200) 177 # res = self.client.get( 178 # reverse( 179 # "authentik_api:certificatekeypair-view-private-key", 180 # kwargs={ 181 # "pk": cert_id, 182 # }, 183 # ), 184 # HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", 185 # ) 186 # self.assertEqual(res.status_code, 200)
Test permissions to access a configured certificate