authentik.outposts.tests.test_controller_k8s
Kubernetes controller tests
1"""Kubernetes controller tests""" 2 3from unittest.mock import MagicMock, patch 4 5from django.test import TestCase 6from kubernetes.client import ApiClient 7from yaml import SafeLoader, load_all 8 9from authentik.blueprints.tests import reconcile_app 10from authentik.lib.generators import generate_id 11from authentik.outposts.apps import MANAGED_OUTPOST 12from authentik.outposts.controllers.k8s.deployment import DeploymentReconciler 13from authentik.outposts.controllers.k8s.service_monitor import PrometheusServiceMonitorReconciler 14from authentik.outposts.controllers.kubernetes import KubernetesController 15from authentik.outposts.models import KubernetesServiceConnection, Outpost, OutpostType 16 17 18class KubernetesControllerTests(TestCase): 19 """Kubernetes controller tests""" 20 21 @reconcile_app("authentik_outposts") 22 def setUp(self) -> None: 23 self.outpost = Outpost.objects.create( 24 name="test", 25 type=OutpostType.PROXY, 26 ) 27 self.integration = KubernetesServiceConnection(name="test") 28 29 def test_gen_name(self): 30 """Ensure the generated name is valid""" 31 controller = KubernetesController( 32 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), 33 self.integration, 34 # Pass something not-none as client so we don't 35 # attempt to connect to K8s as that's not needed 36 client=ApiClient(), 37 ) 38 rec = DeploymentReconciler(controller) 39 self.assertEqual(rec.name, "ak-outpost-authentik-embedded-outpost") 40 41 controller.outpost.name = generate_id() 42 self.assertLess(len(rec.name), 64) 43 44 # Test custom naming template 45 _cfg = controller.outpost.config 46 _cfg.object_naming_template = "" 47 controller.outpost.config = _cfg 48 self.assertEqual(rec.name, f"outpost-{controller.outpost.uuid.hex}") 49 self.assertLess(len(rec.name), 64) 50 51 def test_static(self): 52 self.controller = KubernetesController( 53 self.outpost, 54 self.integration, 55 # Pass something not-none as client so we don't 56 # attempt to connect to K8s as that's not needed 57 client=ApiClient(), 58 ) 59 with patch.object( 60 PrometheusServiceMonitorReconciler, "_crd_exists", MagicMock(return_value=True) 61 ): 62 manifest = self.controller.get_static_deployment() 63 manifests = list(load_all(manifest, Loader=SafeLoader)) 64 self.assertEqual(len(manifests), 5)
class
KubernetesControllerTests(django.test.testcases.TestCase):
19class KubernetesControllerTests(TestCase): 20 """Kubernetes controller tests""" 21 22 @reconcile_app("authentik_outposts") 23 def setUp(self) -> None: 24 self.outpost = Outpost.objects.create( 25 name="test", 26 type=OutpostType.PROXY, 27 ) 28 self.integration = KubernetesServiceConnection(name="test") 29 30 def test_gen_name(self): 31 """Ensure the generated name is valid""" 32 controller = KubernetesController( 33 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), 34 self.integration, 35 # Pass something not-none as client so we don't 36 # attempt to connect to K8s as that's not needed 37 client=ApiClient(), 38 ) 39 rec = DeploymentReconciler(controller) 40 self.assertEqual(rec.name, "ak-outpost-authentik-embedded-outpost") 41 42 controller.outpost.name = generate_id() 43 self.assertLess(len(rec.name), 64) 44 45 # Test custom naming template 46 _cfg = controller.outpost.config 47 _cfg.object_naming_template = "" 48 controller.outpost.config = _cfg 49 self.assertEqual(rec.name, f"outpost-{controller.outpost.uuid.hex}") 50 self.assertLess(len(rec.name), 64) 51 52 def test_static(self): 53 self.controller = KubernetesController( 54 self.outpost, 55 self.integration, 56 # Pass something not-none as client so we don't 57 # attempt to connect to K8s as that's not needed 58 client=ApiClient(), 59 ) 60 with patch.object( 61 PrometheusServiceMonitorReconciler, "_crd_exists", MagicMock(return_value=True) 62 ): 63 manifest = self.controller.get_static_deployment() 64 manifests = list(load_all(manifest, Loader=SafeLoader)) 65 self.assertEqual(len(manifests), 5)
Kubernetes controller tests
@reconcile_app('authentik_outposts')
def
setUp(self) -> None:
22 @reconcile_app("authentik_outposts") 23 def setUp(self) -> None: 24 self.outpost = Outpost.objects.create( 25 name="test", 26 type=OutpostType.PROXY, 27 ) 28 self.integration = KubernetesServiceConnection(name="test")
Hook method for setting up the test fixture before exercising it.
def
test_gen_name(self):
30 def test_gen_name(self): 31 """Ensure the generated name is valid""" 32 controller = KubernetesController( 33 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), 34 self.integration, 35 # Pass something not-none as client so we don't 36 # attempt to connect to K8s as that's not needed 37 client=ApiClient(), 38 ) 39 rec = DeploymentReconciler(controller) 40 self.assertEqual(rec.name, "ak-outpost-authentik-embedded-outpost") 41 42 controller.outpost.name = generate_id() 43 self.assertLess(len(rec.name), 64) 44 45 # Test custom naming template 46 _cfg = controller.outpost.config 47 _cfg.object_naming_template = "" 48 controller.outpost.config = _cfg 49 self.assertEqual(rec.name, f"outpost-{controller.outpost.uuid.hex}") 50 self.assertLess(len(rec.name), 64)
Ensure the generated name is valid
def
test_static(self):
52 def test_static(self): 53 self.controller = KubernetesController( 54 self.outpost, 55 self.integration, 56 # Pass something not-none as client so we don't 57 # attempt to connect to K8s as that's not needed 58 client=ApiClient(), 59 ) 60 with patch.object( 61 PrometheusServiceMonitorReconciler, "_crd_exists", MagicMock(return_value=True) 62 ): 63 manifest = self.controller.get_static_deployment() 64 manifests = list(load_all(manifest, Loader=SafeLoader)) 65 self.assertEqual(len(manifests), 5)