authentik.enterprise.stages.mtls.tests.test_stage
1from ssl import PEM_FOOTER, PEM_HEADER 2from unittest.mock import MagicMock, patch 3from urllib.parse import quote_plus 4 5from django.urls import reverse 6 7from authentik.core.models import User 8from authentik.core.tests.utils import ( 9 create_test_brand, 10 create_test_cert, 11 create_test_flow, 12 create_test_user, 13) 14from authentik.crypto.models import CertificateKeyPair 15from authentik.endpoints.models import StageMode 16from authentik.enterprise.stages.mtls.models import ( 17 CertAttributes, 18 MutualTLSStage, 19 UserAttributes, 20) 21from authentik.enterprise.stages.mtls.stage import PLAN_CONTEXT_CERTIFICATE 22from authentik.flows.models import FlowDesignation, FlowStageBinding 23from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER 24from authentik.flows.tests import FlowTestCase 25from authentik.lib.generators import generate_id 26from authentik.lib.tests.utils import load_fixture 27from authentik.outposts.models import Outpost, OutpostType 28from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 29 30 31class MTLSStageTests(FlowTestCase): 32 33 def setUp(self): 34 super().setUp() 35 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 36 self.ca = CertificateKeyPair.objects.create( 37 name=generate_id(), 38 certificate_data=load_fixture("fixtures/ca.pem"), 39 ) 40 self.stage = MutualTLSStage.objects.create( 41 name=generate_id(), 42 mode=StageMode.REQUIRED, 43 cert_attribute=CertAttributes.COMMON_NAME, 44 user_attribute=UserAttributes.USERNAME, 45 ) 46 47 self.stage.certificate_authorities.add(self.ca) 48 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 49 self.client_cert = load_fixture("fixtures/cert_client.pem") 50 # User matching the certificate 51 User.objects.filter(username="client").delete() 52 self.cert_user = create_test_user(username="client") 53 54 def _format_traefik(self, cert: str | None = None): 55 cert = cert if cert else self.client_cert 56 return quote_plus(cert.replace(PEM_HEADER, "").replace(PEM_FOOTER, "").replace("\n", "")) 57 58 def test_parse_xfcc(self): 59 """Test authentik Proxy/Envoy's XFCC format""" 60 with self.assertFlowFinishes() as plan: 61 res = self.client.get( 62 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 63 headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"}, 64 ) 65 self.assertEqual(res.status_code, 200) 66 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 67 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 68 69 def test_parse_nginx(self): 70 """Test nginx's format""" 71 with self.assertFlowFinishes() as plan: 72 res = self.client.get( 73 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 74 headers={"SSL-Client-Cert": quote_plus(self.client_cert)}, 75 ) 76 self.assertEqual(res.status_code, 200) 77 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 78 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 79 80 def test_parse_traefik(self): 81 """Test traefik's format""" 82 with self.assertFlowFinishes() as plan: 83 res = self.client.get( 84 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 85 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 86 ) 87 self.assertEqual(res.status_code, 200) 88 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 89 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 90 91 def test_parse_outpost_object(self): 92 """Test outposts's format""" 93 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 94 outpost.user.assign_perms_to_managed_role( 95 "authentik_stages_mtls.pass_outpost_certificate", self.stage 96 ) 97 with patch( 98 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 99 MagicMock(return_value=outpost.user), 100 ): 101 with self.assertFlowFinishes() as plan: 102 res = self.client.get( 103 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 104 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 105 ) 106 self.assertEqual(res.status_code, 200) 107 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 108 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 109 110 def test_parse_outpost_global(self): 111 """Test outposts's format""" 112 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 113 outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate") 114 with patch( 115 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 116 MagicMock(return_value=outpost.user), 117 ): 118 with self.assertFlowFinishes() as plan: 119 res = self.client.get( 120 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 121 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 122 ) 123 self.assertEqual(res.status_code, 200) 124 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 125 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 126 127 def test_parse_outpost_no_perm(self): 128 """Test outposts's format""" 129 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 130 with patch( 131 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 132 MagicMock(return_value=outpost.user), 133 ): 134 res = self.client.get( 135 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 136 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 137 ) 138 self.assertEqual(res.status_code, 200) 139 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 140 141 def test_invalid_cert(self): 142 """Test invalid certificate""" 143 cert = create_test_cert() 144 with self.assertFlowFinishes() as plan: 145 res = self.client.get( 146 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 147 headers={ 148 "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data) 149 }, 150 ) 151 self.assertEqual(res.status_code, 200) 152 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 153 self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context) 154 155 def test_auth_no_user(self): 156 """Test auth with no user""" 157 User.objects.filter(username="client").delete() 158 res = self.client.get( 159 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 160 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 161 ) 162 self.assertEqual(res.status_code, 200) 163 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 164 165 def test_brand_ca(self): 166 """Test using a CA from the brand""" 167 self.stage.certificate_authorities.clear() 168 169 brand = create_test_brand() 170 brand.client_certificates.add(self.ca) 171 with self.assertFlowFinishes() as plan: 172 res = self.client.get( 173 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 174 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 175 ) 176 self.assertEqual(res.status_code, 200) 177 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 178 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 179 180 def test_no_ca_optional(self): 181 """Test using no CA Set""" 182 self.stage.mode = StageMode.OPTIONAL 183 self.stage.certificate_authorities.clear() 184 self.stage.save() 185 res = self.client.get( 186 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 187 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 188 ) 189 self.assertEqual(res.status_code, 200) 190 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 191 192 def test_no_ca_required(self): 193 """Test using no CA Set""" 194 self.stage.certificate_authorities.clear() 195 self.stage.save() 196 res = self.client.get( 197 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 198 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 199 ) 200 self.assertEqual(res.status_code, 200) 201 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 202 203 def test_no_cert_optional(self): 204 """Test using no cert Set""" 205 self.stage.mode = StageMode.OPTIONAL 206 self.stage.save() 207 res = self.client.get( 208 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 209 ) 210 self.assertEqual(res.status_code, 200) 211 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 212 213 def test_enroll(self): 214 """Test Enrollment flow""" 215 self.flow.designation = FlowDesignation.ENROLLMENT 216 self.flow.save() 217 with self.assertFlowFinishes() as plan: 218 res = self.client.get( 219 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 220 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 221 ) 222 self.assertEqual(res.status_code, 200) 223 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 224 self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"}) 225 self.assertEqual( 226 plan().context[PLAN_CONTEXT_CERTIFICATE], 227 { 228 "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a", 229 "fingerprint_sha256": ( 230 "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7" 231 ), 232 "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA", 233 "serial_number": "70153443448884702681996102271549704759327537151", 234 "subject": "CN=client", 235 }, 236 )
32class MTLSStageTests(FlowTestCase): 33 34 def setUp(self): 35 super().setUp() 36 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 37 self.ca = CertificateKeyPair.objects.create( 38 name=generate_id(), 39 certificate_data=load_fixture("fixtures/ca.pem"), 40 ) 41 self.stage = MutualTLSStage.objects.create( 42 name=generate_id(), 43 mode=StageMode.REQUIRED, 44 cert_attribute=CertAttributes.COMMON_NAME, 45 user_attribute=UserAttributes.USERNAME, 46 ) 47 48 self.stage.certificate_authorities.add(self.ca) 49 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 50 self.client_cert = load_fixture("fixtures/cert_client.pem") 51 # User matching the certificate 52 User.objects.filter(username="client").delete() 53 self.cert_user = create_test_user(username="client") 54 55 def _format_traefik(self, cert: str | None = None): 56 cert = cert if cert else self.client_cert 57 return quote_plus(cert.replace(PEM_HEADER, "").replace(PEM_FOOTER, "").replace("\n", "")) 58 59 def test_parse_xfcc(self): 60 """Test authentik Proxy/Envoy's XFCC format""" 61 with self.assertFlowFinishes() as plan: 62 res = self.client.get( 63 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 64 headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"}, 65 ) 66 self.assertEqual(res.status_code, 200) 67 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 68 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 69 70 def test_parse_nginx(self): 71 """Test nginx's format""" 72 with self.assertFlowFinishes() as plan: 73 res = self.client.get( 74 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 75 headers={"SSL-Client-Cert": quote_plus(self.client_cert)}, 76 ) 77 self.assertEqual(res.status_code, 200) 78 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 79 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 80 81 def test_parse_traefik(self): 82 """Test traefik's format""" 83 with self.assertFlowFinishes() as plan: 84 res = self.client.get( 85 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 86 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 87 ) 88 self.assertEqual(res.status_code, 200) 89 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 90 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 91 92 def test_parse_outpost_object(self): 93 """Test outposts's format""" 94 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 95 outpost.user.assign_perms_to_managed_role( 96 "authentik_stages_mtls.pass_outpost_certificate", self.stage 97 ) 98 with patch( 99 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 100 MagicMock(return_value=outpost.user), 101 ): 102 with self.assertFlowFinishes() as plan: 103 res = self.client.get( 104 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 105 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 106 ) 107 self.assertEqual(res.status_code, 200) 108 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 109 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 110 111 def test_parse_outpost_global(self): 112 """Test outposts's format""" 113 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 114 outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate") 115 with patch( 116 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 117 MagicMock(return_value=outpost.user), 118 ): 119 with self.assertFlowFinishes() as plan: 120 res = self.client.get( 121 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 122 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 123 ) 124 self.assertEqual(res.status_code, 200) 125 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 126 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 127 128 def test_parse_outpost_no_perm(self): 129 """Test outposts's format""" 130 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 131 with patch( 132 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 133 MagicMock(return_value=outpost.user), 134 ): 135 res = self.client.get( 136 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 137 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 138 ) 139 self.assertEqual(res.status_code, 200) 140 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 141 142 def test_invalid_cert(self): 143 """Test invalid certificate""" 144 cert = create_test_cert() 145 with self.assertFlowFinishes() as plan: 146 res = self.client.get( 147 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 148 headers={ 149 "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data) 150 }, 151 ) 152 self.assertEqual(res.status_code, 200) 153 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 154 self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context) 155 156 def test_auth_no_user(self): 157 """Test auth with no user""" 158 User.objects.filter(username="client").delete() 159 res = self.client.get( 160 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 161 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 162 ) 163 self.assertEqual(res.status_code, 200) 164 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 165 166 def test_brand_ca(self): 167 """Test using a CA from the brand""" 168 self.stage.certificate_authorities.clear() 169 170 brand = create_test_brand() 171 brand.client_certificates.add(self.ca) 172 with self.assertFlowFinishes() as plan: 173 res = self.client.get( 174 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 175 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 176 ) 177 self.assertEqual(res.status_code, 200) 178 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 179 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 180 181 def test_no_ca_optional(self): 182 """Test using no CA Set""" 183 self.stage.mode = StageMode.OPTIONAL 184 self.stage.certificate_authorities.clear() 185 self.stage.save() 186 res = self.client.get( 187 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 188 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 189 ) 190 self.assertEqual(res.status_code, 200) 191 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 192 193 def test_no_ca_required(self): 194 """Test using no CA Set""" 195 self.stage.certificate_authorities.clear() 196 self.stage.save() 197 res = self.client.get( 198 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 199 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 200 ) 201 self.assertEqual(res.status_code, 200) 202 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 203 204 def test_no_cert_optional(self): 205 """Test using no cert Set""" 206 self.stage.mode = StageMode.OPTIONAL 207 self.stage.save() 208 res = self.client.get( 209 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 210 ) 211 self.assertEqual(res.status_code, 200) 212 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 213 214 def test_enroll(self): 215 """Test Enrollment flow""" 216 self.flow.designation = FlowDesignation.ENROLLMENT 217 self.flow.save() 218 with self.assertFlowFinishes() as plan: 219 res = self.client.get( 220 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 221 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 222 ) 223 self.assertEqual(res.status_code, 200) 224 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 225 self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"}) 226 self.assertEqual( 227 plan().context[PLAN_CONTEXT_CERTIFICATE], 228 { 229 "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a", 230 "fingerprint_sha256": ( 231 "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7" 232 ), 233 "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA", 234 "serial_number": "70153443448884702681996102271549704759327537151", 235 "subject": "CN=client", 236 }, 237 )
Helpers for testing flows and stages.
def
setUp(self):
34 def setUp(self): 35 super().setUp() 36 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 37 self.ca = CertificateKeyPair.objects.create( 38 name=generate_id(), 39 certificate_data=load_fixture("fixtures/ca.pem"), 40 ) 41 self.stage = MutualTLSStage.objects.create( 42 name=generate_id(), 43 mode=StageMode.REQUIRED, 44 cert_attribute=CertAttributes.COMMON_NAME, 45 user_attribute=UserAttributes.USERNAME, 46 ) 47 48 self.stage.certificate_authorities.add(self.ca) 49 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 50 self.client_cert = load_fixture("fixtures/cert_client.pem") 51 # User matching the certificate 52 User.objects.filter(username="client").delete() 53 self.cert_user = create_test_user(username="client")
Hook method for setting up the test fixture before exercising it.
def
test_parse_xfcc(self):
59 def test_parse_xfcc(self): 60 """Test authentik Proxy/Envoy's XFCC format""" 61 with self.assertFlowFinishes() as plan: 62 res = self.client.get( 63 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 64 headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"}, 65 ) 66 self.assertEqual(res.status_code, 200) 67 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 68 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test authentik Proxy/Envoy's XFCC format
def
test_parse_nginx(self):
70 def test_parse_nginx(self): 71 """Test nginx's format""" 72 with self.assertFlowFinishes() as plan: 73 res = self.client.get( 74 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 75 headers={"SSL-Client-Cert": quote_plus(self.client_cert)}, 76 ) 77 self.assertEqual(res.status_code, 200) 78 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 79 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test nginx's format
def
test_parse_traefik(self):
81 def test_parse_traefik(self): 82 """Test traefik's format""" 83 with self.assertFlowFinishes() as plan: 84 res = self.client.get( 85 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 86 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 87 ) 88 self.assertEqual(res.status_code, 200) 89 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 90 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test traefik's format
def
test_parse_outpost_object(self):
92 def test_parse_outpost_object(self): 93 """Test outposts's format""" 94 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 95 outpost.user.assign_perms_to_managed_role( 96 "authentik_stages_mtls.pass_outpost_certificate", self.stage 97 ) 98 with patch( 99 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 100 MagicMock(return_value=outpost.user), 101 ): 102 with self.assertFlowFinishes() as plan: 103 res = self.client.get( 104 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 105 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 106 ) 107 self.assertEqual(res.status_code, 200) 108 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 109 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test outposts's format
def
test_parse_outpost_global(self):
111 def test_parse_outpost_global(self): 112 """Test outposts's format""" 113 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 114 outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate") 115 with patch( 116 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 117 MagicMock(return_value=outpost.user), 118 ): 119 with self.assertFlowFinishes() as plan: 120 res = self.client.get( 121 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 122 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 123 ) 124 self.assertEqual(res.status_code, 200) 125 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 126 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test outposts's format
def
test_parse_outpost_no_perm(self):
128 def test_parse_outpost_no_perm(self): 129 """Test outposts's format""" 130 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 131 with patch( 132 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 133 MagicMock(return_value=outpost.user), 134 ): 135 res = self.client.get( 136 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 137 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 138 ) 139 self.assertEqual(res.status_code, 200) 140 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
Test outposts's format
def
test_invalid_cert(self):
142 def test_invalid_cert(self): 143 """Test invalid certificate""" 144 cert = create_test_cert() 145 with self.assertFlowFinishes() as plan: 146 res = self.client.get( 147 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 148 headers={ 149 "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data) 150 }, 151 ) 152 self.assertEqual(res.status_code, 200) 153 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 154 self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context)
Test invalid certificate
def
test_auth_no_user(self):
156 def test_auth_no_user(self): 157 """Test auth with no user""" 158 User.objects.filter(username="client").delete() 159 res = self.client.get( 160 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 161 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 162 ) 163 self.assertEqual(res.status_code, 200) 164 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
Test auth with no user
def
test_brand_ca(self):
166 def test_brand_ca(self): 167 """Test using a CA from the brand""" 168 self.stage.certificate_authorities.clear() 169 170 brand = create_test_brand() 171 brand.client_certificates.add(self.ca) 172 with self.assertFlowFinishes() as plan: 173 res = self.client.get( 174 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 175 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 176 ) 177 self.assertEqual(res.status_code, 200) 178 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 179 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test using a CA from the brand
def
test_no_ca_optional(self):
181 def test_no_ca_optional(self): 182 """Test using no CA Set""" 183 self.stage.mode = StageMode.OPTIONAL 184 self.stage.certificate_authorities.clear() 185 self.stage.save() 186 res = self.client.get( 187 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 188 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 189 ) 190 self.assertEqual(res.status_code, 200) 191 self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
Test using no CA Set
def
test_no_ca_required(self):
193 def test_no_ca_required(self): 194 """Test using no CA Set""" 195 self.stage.certificate_authorities.clear() 196 self.stage.save() 197 res = self.client.get( 198 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 199 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 200 ) 201 self.assertEqual(res.status_code, 200) 202 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
Test using no CA Set
def
test_no_cert_optional(self):
204 def test_no_cert_optional(self): 205 """Test using no cert Set""" 206 self.stage.mode = StageMode.OPTIONAL 207 self.stage.save() 208 res = self.client.get( 209 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 210 ) 211 self.assertEqual(res.status_code, 200) 212 self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
Test using no cert Set
def
test_enroll(self):
214 def test_enroll(self): 215 """Test Enrollment flow""" 216 self.flow.designation = FlowDesignation.ENROLLMENT 217 self.flow.save() 218 with self.assertFlowFinishes() as plan: 219 res = self.client.get( 220 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 221 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 222 ) 223 self.assertEqual(res.status_code, 200) 224 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 225 self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"}) 226 self.assertEqual( 227 plan().context[PLAN_CONTEXT_CERTIFICATE], 228 { 229 "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a", 230 "fingerprint_sha256": ( 231 "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7" 232 ), 233 "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA", 234 "serial_number": "70153443448884702681996102271549704759327537151", 235 "subject": "CN=client", 236 }, 237 )
Test Enrollment flow