authentik.enterprise.stages.mtls.tests.test_stage

  1from ssl import PEM_FOOTER, PEM_HEADER
  2from unittest.mock import MagicMock, patch
  3from urllib.parse import quote_plus
  4
  5from django.urls import reverse
  6
  7from authentik.core.models import User
  8from authentik.core.tests.utils import (
  9    create_test_brand,
 10    create_test_cert,
 11    create_test_flow,
 12    create_test_user,
 13)
 14from authentik.crypto.models import CertificateKeyPair
 15from authentik.endpoints.models import StageMode
 16from authentik.enterprise.stages.mtls.models import (
 17    CertAttributes,
 18    MutualTLSStage,
 19    UserAttributes,
 20)
 21from authentik.enterprise.stages.mtls.stage import PLAN_CONTEXT_CERTIFICATE
 22from authentik.flows.models import FlowDesignation, FlowStageBinding
 23from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
 24from authentik.flows.tests import FlowTestCase
 25from authentik.lib.generators import generate_id
 26from authentik.lib.tests.utils import load_fixture
 27from authentik.outposts.models import Outpost, OutpostType
 28from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 29
 30
 31class MTLSStageTests(FlowTestCase):
 32
 33    def setUp(self):
 34        super().setUp()
 35        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 36        self.ca = CertificateKeyPair.objects.create(
 37            name=generate_id(),
 38            certificate_data=load_fixture("fixtures/ca.pem"),
 39        )
 40        self.stage = MutualTLSStage.objects.create(
 41            name=generate_id(),
 42            mode=StageMode.REQUIRED,
 43            cert_attribute=CertAttributes.COMMON_NAME,
 44            user_attribute=UserAttributes.USERNAME,
 45        )
 46
 47        self.stage.certificate_authorities.add(self.ca)
 48        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 49        self.client_cert = load_fixture("fixtures/cert_client.pem")
 50        # User matching the certificate
 51        User.objects.filter(username="client").delete()
 52        self.cert_user = create_test_user(username="client")
 53
 54    def _format_traefik(self, cert: str | None = None):
 55        cert = cert if cert else self.client_cert
 56        return quote_plus(cert.replace(PEM_HEADER, "").replace(PEM_FOOTER, "").replace("\n", ""))
 57
 58    def test_parse_xfcc(self):
 59        """Test authentik Proxy/Envoy's XFCC format"""
 60        with self.assertFlowFinishes() as plan:
 61            res = self.client.get(
 62                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 63                headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"},
 64            )
 65            self.assertEqual(res.status_code, 200)
 66            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 67        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 68
 69    def test_parse_nginx(self):
 70        """Test nginx's format"""
 71        with self.assertFlowFinishes() as plan:
 72            res = self.client.get(
 73                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 74                headers={"SSL-Client-Cert": quote_plus(self.client_cert)},
 75            )
 76            self.assertEqual(res.status_code, 200)
 77            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 78        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 79
 80    def test_parse_traefik(self):
 81        """Test traefik's format"""
 82        with self.assertFlowFinishes() as plan:
 83            res = self.client.get(
 84                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 85                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
 86            )
 87            self.assertEqual(res.status_code, 200)
 88            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 89        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 90
 91    def test_parse_outpost_object(self):
 92        """Test outposts's format"""
 93        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
 94        outpost.user.assign_perms_to_managed_role(
 95            "authentik_stages_mtls.pass_outpost_certificate", self.stage
 96        )
 97        with patch(
 98            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
 99            MagicMock(return_value=outpost.user),
100        ):
101            with self.assertFlowFinishes() as plan:
102                res = self.client.get(
103                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
104                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
105                )
106                self.assertEqual(res.status_code, 200)
107                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
108            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
109
110    def test_parse_outpost_global(self):
111        """Test outposts's format"""
112        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
113        outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate")
114        with patch(
115            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
116            MagicMock(return_value=outpost.user),
117        ):
118            with self.assertFlowFinishes() as plan:
119                res = self.client.get(
120                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
121                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
122                )
123                self.assertEqual(res.status_code, 200)
124                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
125            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
126
127    def test_parse_outpost_no_perm(self):
128        """Test outposts's format"""
129        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
130        with patch(
131            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
132            MagicMock(return_value=outpost.user),
133        ):
134            res = self.client.get(
135                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
136                headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
137            )
138            self.assertEqual(res.status_code, 200)
139            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
140
141    def test_invalid_cert(self):
142        """Test invalid certificate"""
143        cert = create_test_cert()
144        with self.assertFlowFinishes() as plan:
145            res = self.client.get(
146                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
147                headers={
148                    "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data)
149                },
150            )
151            self.assertEqual(res.status_code, 200)
152            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
153        self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context)
154
155    def test_auth_no_user(self):
156        """Test auth with no user"""
157        User.objects.filter(username="client").delete()
158        res = self.client.get(
159            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
160            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
161        )
162        self.assertEqual(res.status_code, 200)
163        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
164
165    def test_brand_ca(self):
166        """Test using a CA from the brand"""
167        self.stage.certificate_authorities.clear()
168
169        brand = create_test_brand()
170        brand.client_certificates.add(self.ca)
171        with self.assertFlowFinishes() as plan:
172            res = self.client.get(
173                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
174                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
175            )
176            self.assertEqual(res.status_code, 200)
177            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
178        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
179
180    def test_no_ca_optional(self):
181        """Test using no CA Set"""
182        self.stage.mode = StageMode.OPTIONAL
183        self.stage.certificate_authorities.clear()
184        self.stage.save()
185        res = self.client.get(
186            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
187            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
188        )
189        self.assertEqual(res.status_code, 200)
190        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
191
192    def test_no_ca_required(self):
193        """Test using no CA Set"""
194        self.stage.certificate_authorities.clear()
195        self.stage.save()
196        res = self.client.get(
197            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
198            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
199        )
200        self.assertEqual(res.status_code, 200)
201        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
202
203    def test_no_cert_optional(self):
204        """Test using no cert Set"""
205        self.stage.mode = StageMode.OPTIONAL
206        self.stage.save()
207        res = self.client.get(
208            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
209        )
210        self.assertEqual(res.status_code, 200)
211        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
212
213    def test_enroll(self):
214        """Test Enrollment flow"""
215        self.flow.designation = FlowDesignation.ENROLLMENT
216        self.flow.save()
217        with self.assertFlowFinishes() as plan:
218            res = self.client.get(
219                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
220                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
221            )
222            self.assertEqual(res.status_code, 200)
223            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
224        self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"})
225        self.assertEqual(
226            plan().context[PLAN_CONTEXT_CERTIFICATE],
227            {
228                "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a",
229                "fingerprint_sha256": (
230                    "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7"
231                ),
232                "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA",
233                "serial_number": "70153443448884702681996102271549704759327537151",
234                "subject": "CN=client",
235            },
236        )
class MTLSStageTests(authentik.flows.tests.FlowTestCase):
 32class MTLSStageTests(FlowTestCase):
 33
 34    def setUp(self):
 35        super().setUp()
 36        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 37        self.ca = CertificateKeyPair.objects.create(
 38            name=generate_id(),
 39            certificate_data=load_fixture("fixtures/ca.pem"),
 40        )
 41        self.stage = MutualTLSStage.objects.create(
 42            name=generate_id(),
 43            mode=StageMode.REQUIRED,
 44            cert_attribute=CertAttributes.COMMON_NAME,
 45            user_attribute=UserAttributes.USERNAME,
 46        )
 47
 48        self.stage.certificate_authorities.add(self.ca)
 49        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 50        self.client_cert = load_fixture("fixtures/cert_client.pem")
 51        # User matching the certificate
 52        User.objects.filter(username="client").delete()
 53        self.cert_user = create_test_user(username="client")
 54
 55    def _format_traefik(self, cert: str | None = None):
 56        cert = cert if cert else self.client_cert
 57        return quote_plus(cert.replace(PEM_HEADER, "").replace(PEM_FOOTER, "").replace("\n", ""))
 58
 59    def test_parse_xfcc(self):
 60        """Test authentik Proxy/Envoy's XFCC format"""
 61        with self.assertFlowFinishes() as plan:
 62            res = self.client.get(
 63                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 64                headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"},
 65            )
 66            self.assertEqual(res.status_code, 200)
 67            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 68        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 69
 70    def test_parse_nginx(self):
 71        """Test nginx's format"""
 72        with self.assertFlowFinishes() as plan:
 73            res = self.client.get(
 74                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 75                headers={"SSL-Client-Cert": quote_plus(self.client_cert)},
 76            )
 77            self.assertEqual(res.status_code, 200)
 78            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 79        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 80
 81    def test_parse_traefik(self):
 82        """Test traefik's format"""
 83        with self.assertFlowFinishes() as plan:
 84            res = self.client.get(
 85                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 86                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
 87            )
 88            self.assertEqual(res.status_code, 200)
 89            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
 90        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
 91
 92    def test_parse_outpost_object(self):
 93        """Test outposts's format"""
 94        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
 95        outpost.user.assign_perms_to_managed_role(
 96            "authentik_stages_mtls.pass_outpost_certificate", self.stage
 97        )
 98        with patch(
 99            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
100            MagicMock(return_value=outpost.user),
101        ):
102            with self.assertFlowFinishes() as plan:
103                res = self.client.get(
104                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
105                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
106                )
107                self.assertEqual(res.status_code, 200)
108                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
109            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
110
111    def test_parse_outpost_global(self):
112        """Test outposts's format"""
113        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
114        outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate")
115        with patch(
116            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
117            MagicMock(return_value=outpost.user),
118        ):
119            with self.assertFlowFinishes() as plan:
120                res = self.client.get(
121                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
122                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
123                )
124                self.assertEqual(res.status_code, 200)
125                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
126            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
127
128    def test_parse_outpost_no_perm(self):
129        """Test outposts's format"""
130        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
131        with patch(
132            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
133            MagicMock(return_value=outpost.user),
134        ):
135            res = self.client.get(
136                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
137                headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
138            )
139            self.assertEqual(res.status_code, 200)
140            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
141
142    def test_invalid_cert(self):
143        """Test invalid certificate"""
144        cert = create_test_cert()
145        with self.assertFlowFinishes() as plan:
146            res = self.client.get(
147                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
148                headers={
149                    "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data)
150                },
151            )
152            self.assertEqual(res.status_code, 200)
153            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
154        self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context)
155
156    def test_auth_no_user(self):
157        """Test auth with no user"""
158        User.objects.filter(username="client").delete()
159        res = self.client.get(
160            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
161            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
162        )
163        self.assertEqual(res.status_code, 200)
164        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
165
166    def test_brand_ca(self):
167        """Test using a CA from the brand"""
168        self.stage.certificate_authorities.clear()
169
170        brand = create_test_brand()
171        brand.client_certificates.add(self.ca)
172        with self.assertFlowFinishes() as plan:
173            res = self.client.get(
174                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
175                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
176            )
177            self.assertEqual(res.status_code, 200)
178            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
179        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)
180
181    def test_no_ca_optional(self):
182        """Test using no CA Set"""
183        self.stage.mode = StageMode.OPTIONAL
184        self.stage.certificate_authorities.clear()
185        self.stage.save()
186        res = self.client.get(
187            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
188            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
189        )
190        self.assertEqual(res.status_code, 200)
191        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
192
193    def test_no_ca_required(self):
194        """Test using no CA Set"""
195        self.stage.certificate_authorities.clear()
196        self.stage.save()
197        res = self.client.get(
198            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
199            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
200        )
201        self.assertEqual(res.status_code, 200)
202        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
203
204    def test_no_cert_optional(self):
205        """Test using no cert Set"""
206        self.stage.mode = StageMode.OPTIONAL
207        self.stage.save()
208        res = self.client.get(
209            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
210        )
211        self.assertEqual(res.status_code, 200)
212        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
213
214    def test_enroll(self):
215        """Test Enrollment flow"""
216        self.flow.designation = FlowDesignation.ENROLLMENT
217        self.flow.save()
218        with self.assertFlowFinishes() as plan:
219            res = self.client.get(
220                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
221                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
222            )
223            self.assertEqual(res.status_code, 200)
224            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
225        self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"})
226        self.assertEqual(
227            plan().context[PLAN_CONTEXT_CERTIFICATE],
228            {
229                "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a",
230                "fingerprint_sha256": (
231                    "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7"
232                ),
233                "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA",
234                "serial_number": "70153443448884702681996102271549704759327537151",
235                "subject": "CN=client",
236            },
237        )

Helpers for testing flows and stages.

def setUp(self):
34    def setUp(self):
35        super().setUp()
36        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
37        self.ca = CertificateKeyPair.objects.create(
38            name=generate_id(),
39            certificate_data=load_fixture("fixtures/ca.pem"),
40        )
41        self.stage = MutualTLSStage.objects.create(
42            name=generate_id(),
43            mode=StageMode.REQUIRED,
44            cert_attribute=CertAttributes.COMMON_NAME,
45            user_attribute=UserAttributes.USERNAME,
46        )
47
48        self.stage.certificate_authorities.add(self.ca)
49        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
50        self.client_cert = load_fixture("fixtures/cert_client.pem")
51        # User matching the certificate
52        User.objects.filter(username="client").delete()
53        self.cert_user = create_test_user(username="client")

Hook method for setting up the test fixture before exercising it.

def test_parse_xfcc(self):
59    def test_parse_xfcc(self):
60        """Test authentik Proxy/Envoy's XFCC format"""
61        with self.assertFlowFinishes() as plan:
62            res = self.client.get(
63                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
64                headers={"X-Forwarded-Client-Cert": f"Cert={quote_plus(self.client_cert)}"},
65            )
66            self.assertEqual(res.status_code, 200)
67            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
68        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test authentik Proxy/Envoy's XFCC format

def test_parse_nginx(self):
70    def test_parse_nginx(self):
71        """Test nginx's format"""
72        with self.assertFlowFinishes() as plan:
73            res = self.client.get(
74                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
75                headers={"SSL-Client-Cert": quote_plus(self.client_cert)},
76            )
77            self.assertEqual(res.status_code, 200)
78            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
79        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test nginx's format

def test_parse_traefik(self):
81    def test_parse_traefik(self):
82        """Test traefik's format"""
83        with self.assertFlowFinishes() as plan:
84            res = self.client.get(
85                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
86                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
87            )
88            self.assertEqual(res.status_code, 200)
89            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
90        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test traefik's format

def test_parse_outpost_object(self):
 92    def test_parse_outpost_object(self):
 93        """Test outposts's format"""
 94        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
 95        outpost.user.assign_perms_to_managed_role(
 96            "authentik_stages_mtls.pass_outpost_certificate", self.stage
 97        )
 98        with patch(
 99            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
100            MagicMock(return_value=outpost.user),
101        ):
102            with self.assertFlowFinishes() as plan:
103                res = self.client.get(
104                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
105                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
106                )
107                self.assertEqual(res.status_code, 200)
108                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
109            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test outposts's format

def test_parse_outpost_global(self):
111    def test_parse_outpost_global(self):
112        """Test outposts's format"""
113        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
114        outpost.user.assign_perms_to_managed_role("authentik_stages_mtls.pass_outpost_certificate")
115        with patch(
116            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
117            MagicMock(return_value=outpost.user),
118        ):
119            with self.assertFlowFinishes() as plan:
120                res = self.client.get(
121                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
122                    headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
123                )
124                self.assertEqual(res.status_code, 200)
125                self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
126            self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test outposts's format

def test_parse_outpost_no_perm(self):
128    def test_parse_outpost_no_perm(self):
129        """Test outposts's format"""
130        outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
131        with patch(
132            "authentik.root.middleware.ClientIPMiddleware.get_outpost_user",
133            MagicMock(return_value=outpost.user),
134        ):
135            res = self.client.get(
136                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
137                headers={"X-Authentik-Outpost-Certificate": quote_plus(self.client_cert)},
138            )
139            self.assertEqual(res.status_code, 200)
140            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")

Test outposts's format

def test_invalid_cert(self):
142    def test_invalid_cert(self):
143        """Test invalid certificate"""
144        cert = create_test_cert()
145        with self.assertFlowFinishes() as plan:
146            res = self.client.get(
147                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
148                headers={
149                    "X-Forwarded-TLS-Client-Cert": self._format_traefik(cert.certificate_data)
150                },
151            )
152            self.assertEqual(res.status_code, 200)
153            self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")
154        self.assertNotIn(PLAN_CONTEXT_PENDING_USER, plan().context)

Test invalid certificate

def test_auth_no_user(self):
156    def test_auth_no_user(self):
157        """Test auth with no user"""
158        User.objects.filter(username="client").delete()
159        res = self.client.get(
160            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
161            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
162        )
163        self.assertEqual(res.status_code, 200)
164        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")

Test auth with no user

def test_brand_ca(self):
166    def test_brand_ca(self):
167        """Test using a CA from the brand"""
168        self.stage.certificate_authorities.clear()
169
170        brand = create_test_brand()
171        brand.client_certificates.add(self.ca)
172        with self.assertFlowFinishes() as plan:
173            res = self.client.get(
174                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
175                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
176            )
177            self.assertEqual(res.status_code, 200)
178            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
179        self.assertEqual(plan().context[PLAN_CONTEXT_PENDING_USER], self.cert_user)

Test using a CA from the brand

def test_no_ca_optional(self):
181    def test_no_ca_optional(self):
182        """Test using no CA Set"""
183        self.stage.mode = StageMode.OPTIONAL
184        self.stage.certificate_authorities.clear()
185        self.stage.save()
186        res = self.client.get(
187            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
188            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
189        )
190        self.assertEqual(res.status_code, 200)
191        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))

Test using no CA Set

def test_no_ca_required(self):
193    def test_no_ca_required(self):
194        """Test using no CA Set"""
195        self.stage.certificate_authorities.clear()
196        self.stage.save()
197        res = self.client.get(
198            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
199            headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
200        )
201        self.assertEqual(res.status_code, 200)
202        self.assertStageResponse(res, self.flow, component="ak-stage-access-denied")

Test using no CA Set

def test_no_cert_optional(self):
204    def test_no_cert_optional(self):
205        """Test using no cert Set"""
206        self.stage.mode = StageMode.OPTIONAL
207        self.stage.save()
208        res = self.client.get(
209            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
210        )
211        self.assertEqual(res.status_code, 200)
212        self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))

Test using no cert Set

def test_enroll(self):
214    def test_enroll(self):
215        """Test Enrollment flow"""
216        self.flow.designation = FlowDesignation.ENROLLMENT
217        self.flow.save()
218        with self.assertFlowFinishes() as plan:
219            res = self.client.get(
220                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
221                headers={"X-Forwarded-TLS-Client-Cert": self._format_traefik()},
222            )
223            self.assertEqual(res.status_code, 200)
224            self.assertStageRedirects(res, reverse("authentik_core:root-redirect"))
225        self.assertEqual(plan().context[PLAN_CONTEXT_PROMPT], {"email": None, "name": "client"})
226        self.assertEqual(
227            plan().context[PLAN_CONTEXT_CERTIFICATE],
228            {
229                "fingerprint_sha1": "52:39:ca:1e:3a:1f:78:3a:9f:26:3b:c2:84:99:48:68:99:99:81:8a",
230                "fingerprint_sha256": (
231                    "c1:07:8b:7c:e9:02:57:87:1e:92:e5:81:83:21:bc:92:c7:47:65:e3:97:fb:05:97:6f:36:9e:b5:31:77:98:b7"
232                ),
233                "issuer": "OU=Self-signed,O=authentik,CN=authentik Test CA",
234                "serial_number": "70153443448884702681996102271549704759327537151",
235                "subject": "CN=client",
236            },
237        )

Test Enrollment flow