authentik.enterprise.stages.mtls.tests.test_stage
1from ssl import PEM_FOOTER, PEM_HEADER 2from unittest.mock import MagicMock, patch 3from urllib.parse import quote_plus 4 5from django.urls import reverse 6from freezegun import freeze_time 7 8from authentik.core.models import User 9from authentik.core.tests.utils import ( 10 create_test_brand, 11 create_test_cert, 12 create_test_flow, 13 create_test_user, 14) 15from authentik.crypto.models import CertificateKeyPair 16from authentik.endpoints.models import StageMode 17from authentik.enterprise.stages.mtls.models import ( 18 CertAttributes, 19 MutualTLSStage, 20 UserAttributes, 21) 22from authentik.enterprise.stages.mtls.stage import PLAN_CONTEXT_CERTIFICATE 23from authentik.flows.models import FlowDesignation, FlowStageBinding 24from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER 25from authentik.flows.tests import FlowTestCase 26from authentik.lib.generators import generate_id 27from authentik.lib.tests.utils import load_fixture 28from authentik.outposts.models import Outpost, OutpostType 29from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 30 31 32@freeze_time("2026-05-10 12:38:46") 33class MTLSStageTests(FlowTestCase): 34 35 def setUp(self): 36 super().setUp() 37 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 38 self.ca = CertificateKeyPair.objects.create( 39 name=generate_id(), 40 certificate_data=load_fixture("fixtures/ca.pem"), 41 ) 42 self.stage = MutualTLSStage.objects.create( 43 name=generate_id(), 44 mode=StageMode.REQUIRED, 45 cert_attribute=CertAttributes.COMMON_NAME, 46 user_attribute=UserAttributes.USERNAME, 47 ) 48 49 self.stage.certificate_authorities.add(self.ca) 50 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 51 self.client_cert = load_fixture("fixtures/cert_client.pem") 52 # User matching the certificate 53 User.objects.filter(username="client").delete() 54 self.cert_user = create_test_user(username="client") 55 56 def _format_traefik(self, cert: str | None = None): 57 cert = cert if cert else self.client_cert 58 return cert.replace(PEM_HEADER, "").replace(PEM_FOOTER, "").replace("\n", "") 59 60 def test_parse_xfcc(self): 61 """Test authentik Proxy/Envoy's XFCC format""" 62 with self.assertFlowFinishes() as plan: 63 res = self.client.get( 64 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 65 headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"}, 66 ) 67 self.assertEqual(res.status_code, 200) 68 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 69 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 70 71 def test_parse_nginx(self): 72 """Test nginx's format""" 73 with self.assertFlowFinishes() as plan: 74 res = self.client.get( 75 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 76 headers={"SSL-Client-Cert": quote_plus(self.client_cert)}, 77 ) 78 self.assertEqual(res.status_code, 200) 79 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 80 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 81 82 def test_parse_traefik(self): 83 """Test traefik's format""" 84 with self.assertFlowFinishes() as plan: 85 res = self.client.get( 86 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 87 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 88 ) 89 self.assertEqual(res.status_code, 200) 90 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 91 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 92 93 def test_parse_outpost_object(self): 94 """Test outposts's format""" 95 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 96 outpost.user.assign_perms_to_managed_role( 97 "authentik_stages_mtls.pass_outpost_certificate", self.stage 98 ) 99 with patch( 100 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 101 MagicMock(return_value=outpost.user), 102 ): 103 with self.assertFlowFinishes() as plan: 104 res = self.client.get( 105 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 106 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 107 ) 108 self.assertEqual(res.status_code, 200) 109 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 110 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 111 112 def test_parse_outpost_global(self): 113 """Test outposts's format""" 114 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 115 outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate") 116 with patch( 117 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 118 MagicMock(return_value=outpost.user), 119 ): 120 with self.assertFlowFinishes() as plan: 121 res = self.client.get( 122 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 123 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 124 ) 125 self.assertEqual(res.status_code, 200) 126 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 127 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 128 129 def test_parse_outpost_no_perm(self): 130 """Test outposts's format""" 131 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 132 with patch( 133 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 134 MagicMock(return_value=outpost.user), 135 ): 136 res = self.client.get( 137 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 138 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 139 ) 140 self.assertEqual(res.status_code, 200) 141 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 142 143 def test_invalid_cert(self): 144 """Test invalid certificate""" 145 cert = create_test_cert() 146 with self.assertFlowFinishes() as plan: 147 res = self.client.get( 148 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 149 headers={ 150 "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data) 151 }, 152 ) 153 self.assertEqual(res.status_code, 200) 154 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 155 self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context) 156 157 def test_auth_no_user(self): 158 """Test auth with no user""" 159 User.objects.filter(username="client").delete() 160 res = self.client.get( 161 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 162 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 163 ) 164 self.assertEqual(res.status_code, 200) 165 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 166 167 def test_brand_ca(self): 168 """Test using a CA from the brand""" 169 self.stage.certificate_authorities.clear() 170 171 brand = create_test_brand() 172 brand.client_certificates.add(self.ca) 173 with self.assertFlowFinishes() as plan: 174 res = self.client.get( 175 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 176 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 177 ) 178 self.assertEqual(res.status_code, 200) 179 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 180 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 181 182 def test_no_ca_optional(self): 183 """Test using no CA Set""" 184 self.stage.mode = StageMode.OPTIONAL 185 self.stage.certificate_authorities.clear() 186 self.stage.save() 187 res = self.client.get( 188 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 189 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 190 ) 191 self.assertEqual(res.status_code, 200) 192 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 193 194 def test_no_ca_required(self): 195 """Test using no CA Set""" 196 self.stage.certificate_authorities.clear() 197 self.stage.save() 198 res = self.client.get( 199 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 200 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 201 ) 202 self.assertEqual(res.status_code, 200) 203 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 204 205 def test_no_cert_optional(self): 206 """Test using no cert Set""" 207 self.stage.mode = StageMode.OPTIONAL 208 self.stage.save() 209 res = self.client.get( 210 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 211 ) 212 self.assertEqual(res.status_code, 200) 213 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 214 215 def test_enroll(self): 216 """Test Enrollment flow""" 217 self.flow.designation = FlowDesignation.ENROLLMENT 218 self.flow.save() 219 with self.assertFlowFinishes() as plan: 220 res = self.client.get( 221 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 222 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 223 ) 224 self.assertEqual(res.status_code, 200) 225 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 226 self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"}) 227 self.assertEqual( 228 plan().context[PLAN_CONTEXT_CERTIFICATE], 229 { 230 "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a", 231 "fingerprint_sha256": ( 232 "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7" 233 ), 234 "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA", 235 "serial_number": "70153443448884702681996102271549704759327537151", 236 "subject": "CN=client", 237 }, 238 )
33@freeze_time("2026-05-10 12:38:46") 34class MTLSStageTests(FlowTestCase): 35 36 def setUp(self): 37 super().setUp() 38 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 39 self.ca = CertificateKeyPair.objects.create( 40 name=generate_id(), 41 certificate_data=load_fixture("fixtures/ca.pem"), 42 ) 43 self.stage = MutualTLSStage.objects.create( 44 name=generate_id(), 45 mode=StageMode.REQUIRED, 46 cert_attribute=CertAttributes.COMMON_NAME, 47 user_attribute=UserAttributes.USERNAME, 48 ) 49 50 self.stage.certificate_authorities.add(self.ca) 51 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 52 self.client_cert = load_fixture("fixtures/cert_client.pem") 53 # User matching the certificate 54 User.objects.filter(username="client").delete() 55 self.cert_user = create_test_user(username="client") 56 57 def _format_traefik(self, cert: str | None = None): 58 cert = cert if cert else self.client_cert 59 return cert.replace(PEM_HEADER, "").replace(PEM_FOOTER, "").replace("\n", "") 60 61 def test_parse_xfcc(self): 62 """Test authentik Proxy/Envoy's XFCC format""" 63 with self.assertFlowFinishes() as plan: 64 res = self.client.get( 65 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 66 headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"}, 67 ) 68 self.assertEqual(res.status_code, 200) 69 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 70 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 71 72 def test_parse_nginx(self): 73 """Test nginx's format""" 74 with self.assertFlowFinishes() as plan: 75 res = self.client.get( 76 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 77 headers={"SSL-Client-Cert": quote_plus(self.client_cert)}, 78 ) 79 self.assertEqual(res.status_code, 200) 80 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 81 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 82 83 def test_parse_traefik(self): 84 """Test traefik's format""" 85 with self.assertFlowFinishes() as plan: 86 res = self.client.get( 87 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 88 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 89 ) 90 self.assertEqual(res.status_code, 200) 91 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 92 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 93 94 def test_parse_outpost_object(self): 95 """Test outposts's format""" 96 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 97 outpost.user.assign_perms_to_managed_role( 98 "authentik_stages_mtls.pass_outpost_certificate", self.stage 99 ) 100 with patch( 101 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 102 MagicMock(return_value=outpost.user), 103 ): 104 with self.assertFlowFinishes() as plan: 105 res = self.client.get( 106 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 107 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 108 ) 109 self.assertEqual(res.status_code, 200) 110 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 111 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 112 113 def test_parse_outpost_global(self): 114 """Test outposts's format""" 115 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 116 outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate") 117 with patch( 118 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 119 MagicMock(return_value=outpost.user), 120 ): 121 with self.assertFlowFinishes() as plan: 122 res = self.client.get( 123 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 124 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 125 ) 126 self.assertEqual(res.status_code, 200) 127 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 128 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 129 130 def test_parse_outpost_no_perm(self): 131 """Test outposts's format""" 132 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 133 with patch( 134 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 135 MagicMock(return_value=outpost.user), 136 ): 137 res = self.client.get( 138 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 139 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 140 ) 141 self.assertEqual(res.status_code, 200) 142 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 143 144 def test_invalid_cert(self): 145 """Test invalid certificate""" 146 cert = create_test_cert() 147 with self.assertFlowFinishes() as plan: 148 res = self.client.get( 149 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 150 headers={ 151 "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data) 152 }, 153 ) 154 self.assertEqual(res.status_code, 200) 155 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 156 self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context) 157 158 def test_auth_no_user(self): 159 """Test auth with no user""" 160 User.objects.filter(username="client").delete() 161 res = self.client.get( 162 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 163 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 164 ) 165 self.assertEqual(res.status_code, 200) 166 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 167 168 def test_brand_ca(self): 169 """Test using a CA from the brand""" 170 self.stage.certificate_authorities.clear() 171 172 brand = create_test_brand() 173 brand.client_certificates.add(self.ca) 174 with self.assertFlowFinishes() as plan: 175 res = self.client.get( 176 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 177 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 178 ) 179 self.assertEqual(res.status_code, 200) 180 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 181 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user) 182 183 def test_no_ca_optional(self): 184 """Test using no CA Set""" 185 self.stage.mode = StageMode.OPTIONAL 186 self.stage.certificate_authorities.clear() 187 self.stage.save() 188 res = self.client.get( 189 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 190 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 191 ) 192 self.assertEqual(res.status_code, 200) 193 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 194 195 def test_no_ca_required(self): 196 """Test using no CA Set""" 197 self.stage.certificate_authorities.clear() 198 self.stage.save() 199 res = self.client.get( 200 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 201 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 202 ) 203 self.assertEqual(res.status_code, 200) 204 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 205 206 def test_no_cert_optional(self): 207 """Test using no cert Set""" 208 self.stage.mode = StageMode.OPTIONAL 209 self.stage.save() 210 res = self.client.get( 211 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 212 ) 213 self.assertEqual(res.status_code, 200) 214 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 215 216 def test_enroll(self): 217 """Test Enrollment flow""" 218 self.flow.designation = FlowDesignation.ENROLLMENT 219 self.flow.save() 220 with self.assertFlowFinishes() as plan: 221 res = self.client.get( 222 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 223 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 224 ) 225 self.assertEqual(res.status_code, 200) 226 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 227 self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"}) 228 self.assertEqual( 229 plan().context[PLAN_CONTEXT_CERTIFICATE], 230 { 231 "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a", 232 "fingerprint_sha256": ( 233 "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7" 234 ), 235 "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA", 236 "serial_number": "70153443448884702681996102271549704759327537151", 237 "subject": "CN=client", 238 }, 239 )
Helpers for testing flows and stages.
def
setUp(*args: Any, **kwargs: Any) -> None:
692 def setUp(*args: Any, **kwargs: Any) -> None: 693 self.start() 694 if orig_setUp is not None: 695 orig_setUp(*args, **kwargs)
The type of the None singleton.
def
test_parse_xfcc(self):
61 def test_parse_xfcc(self): 62 """Test authentik Proxy/Envoy's XFCC format""" 63 with self.assertFlowFinishes() as plan: 64 res = self.client.get( 65 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 66 headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"}, 67 ) 68 self.assertEqual(res.status_code, 200) 69 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 70 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test authentik Proxy/Envoy's XFCC format
def
test_parse_nginx(self):
72 def test_parse_nginx(self): 73 """Test nginx's format""" 74 with self.assertFlowFinishes() as plan: 75 res = self.client.get( 76 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 77 headers={"SSL-Client-Cert": quote_plus(self.client_cert)}, 78 ) 79 self.assertEqual(res.status_code, 200) 80 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 81 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test nginx's format
def
test_parse_traefik(self):
83 def test_parse_traefik(self): 84 """Test traefik's format""" 85 with self.assertFlowFinishes() as plan: 86 res = self.client.get( 87 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 88 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 89 ) 90 self.assertEqual(res.status_code, 200) 91 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 92 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test traefik's format
def
test_parse_outpost_object(self):
94 def test_parse_outpost_object(self): 95 """Test outposts's format""" 96 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 97 outpost.user.assign_perms_to_managed_role( 98 "authentik_stages_mtls.pass_outpost_certificate", self.stage 99 ) 100 with patch( 101 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 102 MagicMock(return_value=outpost.user), 103 ): 104 with self.assertFlowFinishes() as plan: 105 res = self.client.get( 106 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 107 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 108 ) 109 self.assertEqual(res.status_code, 200) 110 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 111 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test outposts's format
def
test_parse_outpost_global(self):
113 def test_parse_outpost_global(self): 114 """Test outposts's format""" 115 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 116 outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate") 117 with patch( 118 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 119 MagicMock(return_value=outpost.user), 120 ): 121 with self.assertFlowFinishes() as plan: 122 res = self.client.get( 123 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 124 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 125 ) 126 self.assertEqual(res.status_code, 200) 127 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 128 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test outposts's format
def
test_parse_outpost_no_perm(self):
130 def test_parse_outpost_no_perm(self): 131 """Test outposts's format""" 132 outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) 133 with patch( 134 "authentik.root.middleware.ClientIPMiddleware.get_outpost_user", 135 MagicMock(return_value=outpost.user), 136 ): 137 res = self.client.get( 138 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 139 headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)}, 140 ) 141 self.assertEqual(res.status_code, 200) 142 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
Test outposts's format
def
test_invalid_cert(self):
144 def test_invalid_cert(self): 145 """Test invalid certificate""" 146 cert = create_test_cert() 147 with self.assertFlowFinishes() as plan: 148 res = self.client.get( 149 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 150 headers={ 151 "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data) 152 }, 153 ) 154 self.assertEqual(res.status_code, 200) 155 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied") 156 self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context)
Test invalid certificate
def
test_auth_no_user(self):
158 def test_auth_no_user(self): 159 """Test auth with no user""" 160 User.objects.filter(username="client").delete() 161 res = self.client.get( 162 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 163 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 164 ) 165 self.assertEqual(res.status_code, 200) 166 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
Test auth with no user
def
test_brand_ca(self):
168 def test_brand_ca(self): 169 """Test using a CA from the brand""" 170 self.stage.certificate_authorities.clear() 171 172 brand = create_test_brand() 173 brand.client_certificates.add(self.ca) 174 with self.assertFlowFinishes() as plan: 175 res = self.client.get( 176 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 177 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 178 ) 179 self.assertEqual(res.status_code, 200) 180 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 181 self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
Test using a CA from the brand
def
test_no_ca_optional(self):
183 def test_no_ca_optional(self): 184 """Test using no CA Set""" 185 self.stage.mode = StageMode.OPTIONAL 186 self.stage.certificate_authorities.clear() 187 self.stage.save() 188 res = self.client.get( 189 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 190 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 191 ) 192 self.assertEqual(res.status_code, 200) 193 self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
Test using no CA Set
def
test_no_ca_required(self):
195 def test_no_ca_required(self): 196 """Test using no CA Set""" 197 self.stage.certificate_authorities.clear() 198 self.stage.save() 199 res = self.client.get( 200 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 201 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 202 ) 203 self.assertEqual(res.status_code, 200) 204 self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
Test using no CA Set
def
test_no_cert_optional(self):
206 def test_no_cert_optional(self): 207 """Test using no cert Set""" 208 self.stage.mode = StageMode.OPTIONAL 209 self.stage.save() 210 res = self.client.get( 211 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 212 ) 213 self.assertEqual(res.status_code, 200) 214 self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
Test using no cert Set
def
test_enroll(self):
216 def test_enroll(self): 217 """Test Enrollment flow""" 218 self.flow.designation = FlowDesignation.ENROLLMENT 219 self.flow.save() 220 with self.assertFlowFinishes() as plan: 221 res = self.client.get( 222 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 223 headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()}, 224 ) 225 self.assertEqual(res.status_code, 200) 226 self.assertStageRedirects(res, reverse("authentik_core:root-redirect")) 227 self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"}) 228 self.assertEqual( 229 plan().context[PLAN_CONTEXT_CERTIFICATE], 230 { 231 "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a", 232 "fingerprint_sha256": ( 233 "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7" 234 ), 235 "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA", 236 "serial_number": "70153443448884702681996102271549704759327537151", 237 "subject": "CN=client", 238 }, 239 )
Test Enrollment flow
@classmethod
def
setUpClass(cls: type) -> None:
671 @classmethod # type: ignore 672 def setUpClass(cls: type) -> None: 673 self.start() 674 if orig_setUpClass is not None: 675 orig_setUpClass() 676 self.stop()
The type of the None singleton.
@classmethod
def
tearDownClass(cls: type) -> None:
679 @classmethod # type: ignore 680 def tearDownClass(cls: type) -> None: 681 self.start() 682 if orig_tearDownClass is not None: 683 orig_tearDownClass() 684 self.stop()
The type of the None singleton.