authentik.outposts.tests.test_controller_docker
Docker controller tests
1"""Docker controller tests""" 2 3from django.test import TestCase 4from docker.models.containers import Container 5 6from authentik.blueprints.tests import reconcile_app 7from authentik.outposts.apps import MANAGED_OUTPOST 8from authentik.outposts.controllers.base import ControllerException 9from authentik.outposts.controllers.docker import DockerController 10from authentik.outposts.models import DockerServiceConnection, Outpost, OutpostType 11from authentik.providers.proxy.controllers.docker import ProxyDockerController 12 13 14class DockerControllerTests(TestCase): 15 """Docker controller tests""" 16 17 @reconcile_app("authentik_outposts") 18 def setUp(self) -> None: 19 self.outpost = Outpost.objects.create( 20 name="test", 21 type=OutpostType.PROXY, 22 ) 23 self.integration = DockerServiceConnection(name="test") 24 25 def test_init_managed(self): 26 """Docker controller shouldn't do anything for managed outpost""" 27 controller = DockerController( 28 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 29 ) 30 self.assertIsNone(controller.up()) 31 self.assertIsNone(controller.down()) 32 33 def test_init_invalid(self): 34 """Ensure init fails with invalid client""" 35 with self.assertRaises(ControllerException): 36 DockerController(self.outpost, self.integration) 37 38 def test_env_valid(self): 39 """Test environment check""" 40 controller = DockerController( 41 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 42 ) 43 env = [f"{key}={value}" for key, value in controller._get_env().items()] 44 container = Container(attrs={"Config": {"Env": env}}) 45 self.assertFalse(controller._comp_env(container)) 46 47 def test_env_invalid(self): 48 """Test environment check""" 49 controller = DockerController( 50 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 51 ) 52 container = Container(attrs={"Config": {"Env": []}}) 53 self.assertTrue(controller._comp_env(container)) 54 55 def test_label_valid(self): 56 """Test label check""" 57 controller = DockerController( 58 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 59 ) 60 container = Container(attrs={"Config": {"Labels": controller._get_labels()}}) 61 self.assertFalse(controller._comp_labels(container)) 62 63 def test_label_invalid(self): 64 """Test label check""" 65 controller = DockerController( 66 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 67 ) 68 container = Container(attrs={"Config": {"Labels": {}}}) 69 self.assertTrue(controller._comp_labels(container)) 70 container = Container(attrs={"Config": {"Labels": {"io.goauthentik.outpost-uuid": "foo"}}}) 71 self.assertTrue(controller._comp_labels(container)) 72 73 def test_port_valid(self): 74 """Test port check""" 75 controller = ProxyDockerController( 76 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 77 ) 78 container = Container( 79 attrs={ 80 "NetworkSettings": { 81 "Ports": { 82 "9000/tcp": [{"HostIp": "", "HostPort": "9000"}], 83 "9443/tcp": [{"HostIp": "", "HostPort": "9443"}], 84 } 85 }, 86 "State": "", 87 } 88 ) 89 with self.settings(TEST=False): 90 self.assertFalse(controller._comp_ports(container)) 91 container.attrs["State"] = "running" 92 self.assertFalse(controller._comp_ports(container)) 93 94 def test_port_invalid(self): 95 """Test port check""" 96 controller = ProxyDockerController( 97 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 98 ) 99 container_no_ports = Container( 100 attrs={"NetworkSettings": {"Ports": None}, "State": "running"} 101 ) 102 container_missing_port = Container( 103 attrs={ 104 "NetworkSettings": { 105 "Ports": { 106 "9443/tcp": [{"HostIp": "", "HostPort": "9443"}], 107 } 108 }, 109 "State": "running", 110 } 111 ) 112 container_mismatched_host = Container( 113 attrs={ 114 "NetworkSettings": { 115 "Ports": { 116 "9443/tcp": [{"HostIp": "", "HostPort": "123"}], 117 } 118 }, 119 "State": "running", 120 } 121 ) 122 with self.settings(TEST=False): 123 self.assertFalse(controller._comp_ports(container_no_ports)) 124 self.assertTrue(controller._comp_ports(container_missing_port)) 125 self.assertTrue(controller._comp_ports(container_mismatched_host))
class
DockerControllerTests(django.test.testcases.TestCase):
15class DockerControllerTests(TestCase): 16 """Docker controller tests""" 17 18 @reconcile_app("authentik_outposts") 19 def setUp(self) -> None: 20 self.outpost = Outpost.objects.create( 21 name="test", 22 type=OutpostType.PROXY, 23 ) 24 self.integration = DockerServiceConnection(name="test") 25 26 def test_init_managed(self): 27 """Docker controller shouldn't do anything for managed outpost""" 28 controller = DockerController( 29 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 30 ) 31 self.assertIsNone(controller.up()) 32 self.assertIsNone(controller.down()) 33 34 def test_init_invalid(self): 35 """Ensure init fails with invalid client""" 36 with self.assertRaises(ControllerException): 37 DockerController(self.outpost, self.integration) 38 39 def test_env_valid(self): 40 """Test environment check""" 41 controller = DockerController( 42 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 43 ) 44 env = [f"{key}={value}" for key, value in controller._get_env().items()] 45 container = Container(attrs={"Config": {"Env": env}}) 46 self.assertFalse(controller._comp_env(container)) 47 48 def test_env_invalid(self): 49 """Test environment check""" 50 controller = DockerController( 51 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 52 ) 53 container = Container(attrs={"Config": {"Env": []}}) 54 self.assertTrue(controller._comp_env(container)) 55 56 def test_label_valid(self): 57 """Test label check""" 58 controller = DockerController( 59 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 60 ) 61 container = Container(attrs={"Config": {"Labels": controller._get_labels()}}) 62 self.assertFalse(controller._comp_labels(container)) 63 64 def test_label_invalid(self): 65 """Test label check""" 66 controller = DockerController( 67 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 68 ) 69 container = Container(attrs={"Config": {"Labels": {}}}) 70 self.assertTrue(controller._comp_labels(container)) 71 container = Container(attrs={"Config": {"Labels": {"io.goauthentik.outpost-uuid": "foo"}}}) 72 self.assertTrue(controller._comp_labels(container)) 73 74 def test_port_valid(self): 75 """Test port check""" 76 controller = ProxyDockerController( 77 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 78 ) 79 container = Container( 80 attrs={ 81 "NetworkSettings": { 82 "Ports": { 83 "9000/tcp": [{"HostIp": "", "HostPort": "9000"}], 84 "9443/tcp": [{"HostIp": "", "HostPort": "9443"}], 85 } 86 }, 87 "State": "", 88 } 89 ) 90 with self.settings(TEST=False): 91 self.assertFalse(controller._comp_ports(container)) 92 container.attrs["State"] = "running" 93 self.assertFalse(controller._comp_ports(container)) 94 95 def test_port_invalid(self): 96 """Test port check""" 97 controller = ProxyDockerController( 98 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 99 ) 100 container_no_ports = Container( 101 attrs={"NetworkSettings": {"Ports": None}, "State": "running"} 102 ) 103 container_missing_port = Container( 104 attrs={ 105 "NetworkSettings": { 106 "Ports": { 107 "9443/tcp": [{"HostIp": "", "HostPort": "9443"}], 108 } 109 }, 110 "State": "running", 111 } 112 ) 113 container_mismatched_host = Container( 114 attrs={ 115 "NetworkSettings": { 116 "Ports": { 117 "9443/tcp": [{"HostIp": "", "HostPort": "123"}], 118 } 119 }, 120 "State": "running", 121 } 122 ) 123 with self.settings(TEST=False): 124 self.assertFalse(controller._comp_ports(container_no_ports)) 125 self.assertTrue(controller._comp_ports(container_missing_port)) 126 self.assertTrue(controller._comp_ports(container_mismatched_host))
Docker controller tests
@reconcile_app('authentik_outposts')
def
setUp(self) -> None:
18 @reconcile_app("authentik_outposts") 19 def setUp(self) -> None: 20 self.outpost = Outpost.objects.create( 21 name="test", 22 type=OutpostType.PROXY, 23 ) 24 self.integration = DockerServiceConnection(name="test")
Hook method for setting up the test fixture before exercising it.
def
test_init_managed(self):
26 def test_init_managed(self): 27 """Docker controller shouldn't do anything for managed outpost""" 28 controller = DockerController( 29 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 30 ) 31 self.assertIsNone(controller.up()) 32 self.assertIsNone(controller.down())
Docker controller shouldn't do anything for managed outpost
def
test_init_invalid(self):
34 def test_init_invalid(self): 35 """Ensure init fails with invalid client""" 36 with self.assertRaises(ControllerException): 37 DockerController(self.outpost, self.integration)
Ensure init fails with invalid client
def
test_env_valid(self):
39 def test_env_valid(self): 40 """Test environment check""" 41 controller = DockerController( 42 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 43 ) 44 env = [f"{key}={value}" for key, value in controller._get_env().items()] 45 container = Container(attrs={"Config": {"Env": env}}) 46 self.assertFalse(controller._comp_env(container))
Test environment check
def
test_env_invalid(self):
48 def test_env_invalid(self): 49 """Test environment check""" 50 controller = DockerController( 51 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 52 ) 53 container = Container(attrs={"Config": {"Env": []}}) 54 self.assertTrue(controller._comp_env(container))
Test environment check
def
test_label_valid(self):
56 def test_label_valid(self): 57 """Test label check""" 58 controller = DockerController( 59 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 60 ) 61 container = Container(attrs={"Config": {"Labels": controller._get_labels()}}) 62 self.assertFalse(controller._comp_labels(container))
Test label check
def
test_label_invalid(self):
64 def test_label_invalid(self): 65 """Test label check""" 66 controller = DockerController( 67 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 68 ) 69 container = Container(attrs={"Config": {"Labels": {}}}) 70 self.assertTrue(controller._comp_labels(container)) 71 container = Container(attrs={"Config": {"Labels": {"io.goauthentik.outpost-uuid": "foo"}}}) 72 self.assertTrue(controller._comp_labels(container))
Test label check
def
test_port_valid(self):
74 def test_port_valid(self): 75 """Test port check""" 76 controller = ProxyDockerController( 77 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 78 ) 79 container = Container( 80 attrs={ 81 "NetworkSettings": { 82 "Ports": { 83 "9000/tcp": [{"HostIp": "", "HostPort": "9000"}], 84 "9443/tcp": [{"HostIp": "", "HostPort": "9443"}], 85 } 86 }, 87 "State": "", 88 } 89 ) 90 with self.settings(TEST=False): 91 self.assertFalse(controller._comp_ports(container)) 92 container.attrs["State"] = "running" 93 self.assertFalse(controller._comp_ports(container))
Test port check
def
test_port_invalid(self):
95 def test_port_invalid(self): 96 """Test port check""" 97 controller = ProxyDockerController( 98 Outpost.objects.filter(managed=MANAGED_OUTPOST).first(), self.integration 99 ) 100 container_no_ports = Container( 101 attrs={"NetworkSettings": {"Ports": None}, "State": "running"} 102 ) 103 container_missing_port = Container( 104 attrs={ 105 "NetworkSettings": { 106 "Ports": { 107 "9443/tcp": [{"HostIp": "", "HostPort": "9443"}], 108 } 109 }, 110 "State": "running", 111 } 112 ) 113 container_mismatched_host = Container( 114 attrs={ 115 "NetworkSettings": { 116 "Ports": { 117 "9443/tcp": [{"HostIp": "", "HostPort": "123"}], 118 } 119 }, 120 "State": "running", 121 } 122 ) 123 with self.settings(TEST=False): 124 self.assertFalse(controller._comp_ports(container_no_ports)) 125 self.assertTrue(controller._comp_ports(container_missing_port)) 126 self.assertTrue(controller._comp_ports(container_mismatched_host))
Test port check