authentik.enterprise.stages.mtls.tests.test_stage

  1from ssl import PEM_FOOTER, PEM_HEADER
  2from unittest.mock import MagicMock, patch
  3from urllib.parse import quote_plus
  4
  5from django.urls import reverse
  6from freezegun import freeze_time
  7
  8from authentik.core.models import User
  9from authentik.core.tests.utils import (
 10    create_test_brand,
 11    create_test_cert,
 12    create_test_flow,
 13    create_test_user,
 14)
 15from authentik.crypto.models import CertificateKeyPair
 16from authentik.endpoints.models import StageMode
 17from authentik.enterprise.stages.mtls.models import (
 18    CertAttributes,
 19    MutualTLSStage,
 20    UserAttributes,
 21)
 22from authentik.enterprise.stages.mtls.stage import PLAN_CONTEXT_CERTIFICATE
 23from authentik.flows.models import FlowDesignation, FlowStageBinding
 24from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
 25from authentik.flows.tests import FlowTestCase
 26from authentik.lib.generators import generate_id
 27from authentik.lib.tests.utils import load_fixture
 28from authentik.outposts.models import Outpost, OutpostType
 29from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 30
 31
 32@freeze_time("2026-05-10 12:38:46")
 33class MTLSStageTests(FlowTestCase):
 34
 35    def setUp(self):
 36        super().setUp()
 37        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 38        self.ca = CertificateKeyPair.objects.create(
 39            name=generate_id(),
 40            certificate_data=load_fixture("fixtures/ca.pem"),
 41        )
 42        self.stage = MutualTLSStage.objects.create(
 43            name=generate_id(),
 44            mode=StageMode.REQUIRED,
 45            cert_attribute=CertAttributes.COMMON_NAME,
 46            user_attribute=UserAttributes.USERNAME,
 47        )
 48
 49        self.stage.certificate_authorities.add(self.ca)
 50        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 51        self.client_cert = load_fixture("fixtures/cert_client.pem")
 52        # User matching the certificate
 53        User.objects.filter(username="client").delete()
 54        self.cert_user = create_test_user(username="client")
 55
 56    def _format_traefik(self, cert: str | None = None):
 57        cert = cert if cert else self.client_cert
 58        return cert.replace(PEM_HEADER, "").replace(PEM_FOOTER, "").replace("\n", "")
 59
 60    def test_parse_xfcc(self):
 61        """Test authentik Proxy/Envoy's XFCC format"""
 62        with self.assertFlowFinishes() as plan:
 63            res = self.client.get(
 64                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 65                headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"},
 66            )
 67            self.assertEqual(res.status_code, 200)
 68            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 69        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 70
 71    def test_parse_nginx(self):
 72        """Test nginx's format"""
 73        with self.assertFlowFinishes() as plan:
 74            res = self.client.get(
 75                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 76                headers={"SSL-Client-Cert": quote_plus(self.client_cert)},
 77            )
 78            self.assertEqual(res.status_code, 200)
 79            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 80        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 81
 82    def test_parse_traefik(self):
 83        """Test traefik's format"""
 84        with self.assertFlowFinishes() as plan:
 85            res = self.client.get(
 86                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 87                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
 88            )
 89            self.assertEqual(res.status_code, 200)
 90            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 91        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 92
 93    def test_parse_outpost_object(self):
 94        """Test outposts's format"""
 95        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
 96        outpost.user.assign_perms_to_managed_role(
 97            "authentik_stages_mtls.pass_outpost_certificate", self.stage
 98        )
 99        with patch(
100            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
101            MagicMock(return_value=outpost.user),
102        ):
103            with self.assertFlowFinishes() as plan:
104                res = self.client.get(
105                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
106                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
107                )
108                self.assertEqual(res.status_code, 200)
109                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
110            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
111
112    def test_parse_outpost_global(self):
113        """Test outposts's format"""
114        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
115        outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate")
116        with patch(
117            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
118            MagicMock(return_value=outpost.user),
119        ):
120            with self.assertFlowFinishes() as plan:
121                res = self.client.get(
122                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
123                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
124                )
125                self.assertEqual(res.status_code, 200)
126                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
127            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
128
129    def test_parse_outpost_no_perm(self):
130        """Test outposts's format"""
131        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
132        with patch(
133            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
134            MagicMock(return_value=outpost.user),
135        ):
136            res = self.client.get(
137                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
138                headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
139            )
140            self.assertEqual(res.status_code, 200)
141            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
142
143    def test_invalid_cert(self):
144        """Test invalid certificate"""
145        cert = create_test_cert()
146        with self.assertFlowFinishes() as plan:
147            res = self.client.get(
148                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
149                headers={
150                    "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data)
151                },
152            )
153            self.assertEqual(res.status_code, 200)
154            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
155        self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context)
156
157    def test_auth_no_user(self):
158        """Test auth with no user"""
159        User.objects.filter(username="client").delete()
160        res = self.client.get(
161            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
162            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
163        )
164        self.assertEqual(res.status_code, 200)
165        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
166
167    def test_brand_ca(self):
168        """Test using a CA from the brand"""
169        self.stage.certificate_authorities.clear()
170
171        brand = create_test_brand()
172        brand.client_certificates.add(self.ca)
173        with self.assertFlowFinishes() as plan:
174            res = self.client.get(
175                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
176                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
177            )
178            self.assertEqual(res.status_code, 200)
179            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
180        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
181
182    def test_no_ca_optional(self):
183        """Test using no CA Set"""
184        self.stage.mode = StageMode.OPTIONAL
185        self.stage.certificate_authorities.clear()
186        self.stage.save()
187        res = self.client.get(
188            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
189            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
190        )
191        self.assertEqual(res.status_code, 200)
192        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
193
194    def test_no_ca_required(self):
195        """Test using no CA Set"""
196        self.stage.certificate_authorities.clear()
197        self.stage.save()
198        res = self.client.get(
199            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
200            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
201        )
202        self.assertEqual(res.status_code, 200)
203        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
204
205    def test_no_cert_optional(self):
206        """Test using no cert Set"""
207        self.stage.mode = StageMode.OPTIONAL
208        self.stage.save()
209        res = self.client.get(
210            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
211        )
212        self.assertEqual(res.status_code, 200)
213        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
214
215    def test_enroll(self):
216        """Test Enrollment flow"""
217        self.flow.designation = FlowDesignation.ENROLLMENT
218        self.flow.save()
219        with self.assertFlowFinishes() as plan:
220            res = self.client.get(
221                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
222                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
223            )
224            self.assertEqual(res.status_code, 200)
225            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
226        self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"})
227        self.assertEqual(
228            plan().context[PLAN_CONTEXT_CERTIFICATE],
229            {
230                "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a",
231                "fingerprint_sha256": (
232                    "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7"
233                ),
234                "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA",
235                "serial_number": "70153443448884702681996102271549704759327537151",
236                "subject": "CN=client",
237            },
238        )
@freeze_time('2026-05-10 12:38:46')
class MTLSStageTests(authentik.flows.tests.FlowTestCase):
 33@freeze_time("2026-05-10 12:38:46")
 34class MTLSStageTests(FlowTestCase):
 35
 36    def setUp(self):
 37        super().setUp()
 38        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 39        self.ca = CertificateKeyPair.objects.create(
 40            name=generate_id(),
 41            certificate_data=load_fixture("fixtures/ca.pem"),
 42        )
 43        self.stage = MutualTLSStage.objects.create(
 44            name=generate_id(),
 45            mode=StageMode.REQUIRED,
 46            cert_attribute=CertAttributes.COMMON_NAME,
 47            user_attribute=UserAttributes.USERNAME,
 48        )
 49
 50        self.stage.certificate_authorities.add(self.ca)
 51        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 52        self.client_cert = load_fixture("fixtures/cert_client.pem")
 53        # User matching the certificate
 54        User.objects.filter(username="client").delete()
 55        self.cert_user = create_test_user(username="client")
 56
 57    def _format_traefik(self, cert: str | None = None):
 58        cert = cert if cert else self.client_cert
 59        return cert.replace(PEM_HEADER, "").replace(PEM_FOOTER, "").replace("\n", "")
 60
 61    def test_parse_xfcc(self):
 62        """Test authentik Proxy/Envoy's XFCC format"""
 63        with self.assertFlowFinishes() as plan:
 64            res = self.client.get(
 65                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 66                headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"},
 67            )
 68            self.assertEqual(res.status_code, 200)
 69            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 70        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 71
 72    def test_parse_nginx(self):
 73        """Test nginx's format"""
 74        with self.assertFlowFinishes() as plan:
 75            res = self.client.get(
 76                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 77                headers={"SSL-Client-Cert": quote_plus(self.client_cert)},
 78            )
 79            self.assertEqual(res.status_code, 200)
 80            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 81        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 82
 83    def test_parse_traefik(self):
 84        """Test traefik's format"""
 85        with self.assertFlowFinishes() as plan:
 86            res = self.client.get(
 87                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 88                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
 89            )
 90            self.assertEqual(res.status_code, 200)
 91            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 92        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 93
 94    def test_parse_outpost_object(self):
 95        """Test outposts's format"""
 96        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
 97        outpost.user.assign_perms_to_managed_role(
 98            "authentik_stages_mtls.pass_outpost_certificate", self.stage
 99        )
100        with patch(
101            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
102            MagicMock(return_value=outpost.user),
103        ):
104            with self.assertFlowFinishes() as plan:
105                res = self.client.get(
106                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
107                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
108                )
109                self.assertEqual(res.status_code, 200)
110                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
111            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
112
113    def test_parse_outpost_global(self):
114        """Test outposts's format"""
115        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
116        outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate")
117        with patch(
118            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
119            MagicMock(return_value=outpost.user),
120        ):
121            with self.assertFlowFinishes() as plan:
122                res = self.client.get(
123                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
124                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
125                )
126                self.assertEqual(res.status_code, 200)
127                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
128            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
129
130    def test_parse_outpost_no_perm(self):
131        """Test outposts's format"""
132        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
133        with patch(
134            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
135            MagicMock(return_value=outpost.user),
136        ):
137            res = self.client.get(
138                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
139                headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
140            )
141            self.assertEqual(res.status_code, 200)
142            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
143
144    def test_invalid_cert(self):
145        """Test invalid certificate"""
146        cert = create_test_cert()
147        with self.assertFlowFinishes() as plan:
148            res = self.client.get(
149                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
150                headers={
151                    "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data)
152                },
153            )
154            self.assertEqual(res.status_code, 200)
155            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
156        self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context)
157
158    def test_auth_no_user(self):
159        """Test auth with no user"""
160        User.objects.filter(username="client").delete()
161        res = self.client.get(
162            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
163            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
164        )
165        self.assertEqual(res.status_code, 200)
166        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
167
168    def test_brand_ca(self):
169        """Test using a CA from the brand"""
170        self.stage.certificate_authorities.clear()
171
172        brand = create_test_brand()
173        brand.client_certificates.add(self.ca)
174        with self.assertFlowFinishes() as plan:
175            res = self.client.get(
176                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
177                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
178            )
179            self.assertEqual(res.status_code, 200)
180            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
181        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
182
183    def test_no_ca_optional(self):
184        """Test using no CA Set"""
185        self.stage.mode = StageMode.OPTIONAL
186        self.stage.certificate_authorities.clear()
187        self.stage.save()
188        res = self.client.get(
189            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
190            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
191        )
192        self.assertEqual(res.status_code, 200)
193        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
194
195    def test_no_ca_required(self):
196        """Test using no CA Set"""
197        self.stage.certificate_authorities.clear()
198        self.stage.save()
199        res = self.client.get(
200            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
201            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
202        )
203        self.assertEqual(res.status_code, 200)
204        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
205
206    def test_no_cert_optional(self):
207        """Test using no cert Set"""
208        self.stage.mode = StageMode.OPTIONAL
209        self.stage.save()
210        res = self.client.get(
211            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
212        )
213        self.assertEqual(res.status_code, 200)
214        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
215
216    def test_enroll(self):
217        """Test Enrollment flow"""
218        self.flow.designation = FlowDesignation.ENROLLMENT
219        self.flow.save()
220        with self.assertFlowFinishes() as plan:
221            res = self.client.get(
222                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
223                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
224            )
225            self.assertEqual(res.status_code, 200)
226            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
227        self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"})
228        self.assertEqual(
229            plan().context[PLAN_CONTEXT_CERTIFICATE],
230            {
231                "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a",
232                "fingerprint_sha256": (
233                    "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7"
234                ),
235                "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA",
236                "serial_number": "70153443448884702681996102271549704759327537151",
237                "subject": "CN=client",
238            },
239        )

Helpers for testing flows and stages.

def setUp(*args: Any, **kwargs: Any) -> None:
692            def setUp(*args: Any, **kwargs: Any) -> None:
693                self.start()
694                if orig_setUp is not None:
695                    orig_setUp(*args, **kwargs)

The type of the None singleton.

def test_parse_xfcc(self):
61    def test_parse_xfcc(self):
62        """Test authentik Proxy/Envoy's XFCC format"""
63        with self.assertFlowFinishes() as plan:
64            res = self.client.get(
65                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
66                headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"},
67            )
68            self.assertEqual(res.status_code, 200)
69            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
70        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test authentik Proxy/Envoy's XFCC format

def test_parse_nginx(self):
72    def test_parse_nginx(self):
73        """Test nginx's format"""
74        with self.assertFlowFinishes() as plan:
75            res = self.client.get(
76                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
77                headers={"SSL-Client-Cert": quote_plus(self.client_cert)},
78            )
79            self.assertEqual(res.status_code, 200)
80            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
81        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test nginx's format

def test_parse_traefik(self):
83    def test_parse_traefik(self):
84        """Test traefik's format"""
85        with self.assertFlowFinishes() as plan:
86            res = self.client.get(
87                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
88                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
89            )
90            self.assertEqual(res.status_code, 200)
91            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
92        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test traefik's format

def test_parse_outpost_object(self):
 94    def test_parse_outpost_object(self):
 95        """Test outposts's format"""
 96        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
 97        outpost.user.assign_perms_to_managed_role(
 98            "authentik_stages_mtls.pass_outpost_certificate", self.stage
 99        )
100        with patch(
101            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
102            MagicMock(return_value=outpost.user),
103        ):
104            with self.assertFlowFinishes() as plan:
105                res = self.client.get(
106                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
107                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
108                )
109                self.assertEqual(res.status_code, 200)
110                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
111            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test outposts's format

def test_parse_outpost_global(self):
113    def test_parse_outpost_global(self):
114        """Test outposts's format"""
115        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
116        outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate")
117        with patch(
118            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
119            MagicMock(return_value=outpost.user),
120        ):
121            with self.assertFlowFinishes() as plan:
122                res = self.client.get(
123                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
124                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
125                )
126                self.assertEqual(res.status_code, 200)
127                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
128            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test outposts's format

def test_parse_outpost_no_perm(self):
130    def test_parse_outpost_no_perm(self):
131        """Test outposts's format"""
132        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
133        with patch(
134            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
135            MagicMock(return_value=outpost.user),
136        ):
137            res = self.client.get(
138                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
139                headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
140            )
141            self.assertEqual(res.status_code, 200)
142            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")

Test outposts's format

def test_invalid_cert(self):
144    def test_invalid_cert(self):
145        """Test invalid certificate"""
146        cert = create_test_cert()
147        with self.assertFlowFinishes() as plan:
148            res = self.client.get(
149                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
150                headers={
151                    "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data)
152                },
153            )
154            self.assertEqual(res.status_code, 200)
155            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
156        self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context)

Test invalid certificate

def test_auth_no_user(self):
158    def test_auth_no_user(self):
159        """Test auth with no user"""
160        User.objects.filter(username="client").delete()
161        res = self.client.get(
162            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
163            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
164        )
165        self.assertEqual(res.status_code, 200)
166        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")

Test auth with no user

def test_brand_ca(self):
168    def test_brand_ca(self):
169        """Test using a CA from the brand"""
170        self.stage.certificate_authorities.clear()
171
172        brand = create_test_brand()
173        brand.client_certificates.add(self.ca)
174        with self.assertFlowFinishes() as plan:
175            res = self.client.get(
176                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
177                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
178            )
179            self.assertEqual(res.status_code, 200)
180            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
181        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test using a CA from the brand

def test_no_ca_optional(self):
183    def test_no_ca_optional(self):
184        """Test using no CA Set"""
185        self.stage.mode = StageMode.OPTIONAL
186        self.stage.certificate_authorities.clear()
187        self.stage.save()
188        res = self.client.get(
189            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
190            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
191        )
192        self.assertEqual(res.status_code, 200)
193        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))

Test using no CA Set

def test_no_ca_required(self):
195    def test_no_ca_required(self):
196        """Test using no CA Set"""
197        self.stage.certificate_authorities.clear()
198        self.stage.save()
199        res = self.client.get(
200            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
201            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
202        )
203        self.assertEqual(res.status_code, 200)
204        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")

Test using no CA Set

def test_no_cert_optional(self):
206    def test_no_cert_optional(self):
207        """Test using no cert Set"""
208        self.stage.mode = StageMode.OPTIONAL
209        self.stage.save()
210        res = self.client.get(
211            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
212        )
213        self.assertEqual(res.status_code, 200)
214        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))

Test using no cert Set

def test_enroll(self):
216    def test_enroll(self):
217        """Test Enrollment flow"""
218        self.flow.designation = FlowDesignation.ENROLLMENT
219        self.flow.save()
220        with self.assertFlowFinishes() as plan:
221            res = self.client.get(
222                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
223                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
224            )
225            self.assertEqual(res.status_code, 200)
226            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
227        self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"})
228        self.assertEqual(
229            plan().context[PLAN_CONTEXT_CERTIFICATE],
230            {
231                "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a",
232                "fingerprint_sha256": (
233                    "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7"
234                ),
235                "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA",
236                "serial_number": "70153443448884702681996102271549704759327537151",
237                "subject": "CN=client",
238            },
239        )

Test Enrollment flow

@classmethod
def setUpClass(cls: type) -> None:
671            @classmethod  # type: ignore
672            def setUpClass(cls: type) -> None:
673                self.start()
674                if orig_setUpClass is not None:
675                    orig_setUpClass()
676                self.stop()

The type of the None singleton.

@classmethod
def tearDownClass(cls: type) -> None:
679            @classmethod  # type: ignore
680            def tearDownClass(cls: type) -> None:
681                self.start()
682                if orig_tearDownClass is not None:
683                    orig_tearDownClass()
684                self.stop()

The type of the None singleton.

def tearDown(*args: Any, **kwargs: Any) -> None:
697            def tearDown(*args: Any, **kwargs: Any) -> None:
698                if orig_tearDown is not None:
699                    orig_tearDown(*args, **kwargs)
700                self.stop()

The type of the None singleton.