authentik.enterprise.stages.mtls.stage

  1from binascii import hexlify
  2from enum import IntFlag, auto
  3from urllib.parse import unquote_plus
  4
  5from cryptography.exceptions import InvalidSignature
  6from cryptography.hazmat.primitives import hashes
  7from cryptography.x509 import (
  8    Certificate,
  9    NameOID,
 10    ObjectIdentifier,
 11    RFC822Name,
 12    SubjectAlternativeName,
 13    UnsupportedGeneralNameType,
 14    load_pem_x509_certificate,
 15)
 16from cryptography.x509.verification import PolicyBuilder, Store, VerificationError
 17from django.utils.translation import gettext_lazy as _
 18
 19from authentik.brands.models import Brand
 20from authentik.core.models import User
 21from authentik.crypto.models import CertificateKeyPair, fingerprint_sha256, format_cert
 22from authentik.endpoints.models import StageMode
 23from authentik.enterprise.stages.mtls.models import (
 24    CertAttributes,
 25    MutualTLSStage,
 26    UserAttributes,
 27)
 28from authentik.flows.challenge import AccessDeniedChallenge
 29from authentik.flows.models import FlowDesignation
 30from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
 31from authentik.flows.stage import ChallengeStageView
 32from authentik.root.middleware import ClientIPMiddleware
 33from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
 34from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 35
 36# All of these headers must only be accepted from "trusted" reverse proxies
 37# See internal/web/proxy.go:39
 38HEADER_PROXY_FORWARDED = "X-Forwarded-Client-Cert"
 39HEADER_NGINX_FORWARDED = "SSL-Client-Cert"
 40HEADER_TRAEFIK_FORWARDED = "X-Forwarded-TLS-Client-Cert"
 41HEADER_OUTPOST_FORWARDED = "X-Authentik-Outpost-Certificate"
 42
 43
 44PLAN_CONTEXT_CERTIFICATE = "certificate"
 45
 46
 47class ParseOptions(IntFlag):
 48
 49    # URL unquote the string
 50    UNQUOTE = auto()
 51    # Re-add PEM Header & footer, and chunk it into 64 character lines
 52    FORMAT = auto()
 53
 54
 55class MTLSStageView(ChallengeStageView):
 56
 57    def __parse_single_cert(self, raw: str | None, *options: ParseOptions) -> list[Certificate]:
 58        """Helper to parse a single certificate"""
 59        if not raw:
 60            return []
 61        for opt in options:
 62            match opt:
 63                case ParseOptions.FORMAT:
 64                    raw = format_cert(raw)
 65                case ParseOptions.UNQUOTE:
 66                    raw = unquote_plus(raw)
 67        try:
 68            cert = load_pem_x509_certificate(raw.encode())
 69            return [cert]
 70        except ValueError as exc:
 71            self.logger.info("Failed to parse certificate", exc=exc)
 72            return []
 73
 74    def _parse_cert_xfcc(self) -> list[Certificate]:
 75        """Parse certificates in the format given to us in
 76        the format of the authentik router/envoy"""
 77        # https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#x-forwarded-client-cert
 78        xfcc_raw = self.request.headers.get(HEADER_PROXY_FORWARDED)
 79        if not xfcc_raw:
 80            return []
 81        certs = []
 82        for r_cert in xfcc_raw.split(","):
 83            el = r_cert.split(";")
 84            raw_cert = {k.split("=")[0]: k.split("=")[1] for k in el}
 85            if "Cert" not in raw_cert:
 86                continue
 87            certs.extend(self.__parse_single_cert(raw_cert["Cert"], ParseOptions.UNQUOTE))
 88        return certs
 89
 90    def _parse_cert_nginx(self) -> list[Certificate]:
 91        """Parse certificates in the format nginx-ingress gives to us"""
 92        # https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#client-certificate-authentication
 93        # https://github.com/kubernetes/ingress-nginx/blob/78f593b24494a0674b362faf551079f06d71b5a9/rootfs/etc/nginx/template/nginx.tmpl#L1096
 94        sslcc_raw = self.request.headers.get(HEADER_NGINX_FORWARDED)
 95        return self.__parse_single_cert(sslcc_raw, ParseOptions.UNQUOTE)
 96
 97    def _parse_cert_traefik(self) -> list[Certificate]:
 98        """Parse certificates in the format traefik gives to us"""
 99        # https://doc.traefik.io/traefik/reference/routing-configuration/http/middlewares/passtlsclientcert/
100        ftcc_raw = self.request.headers.get(HEADER_TRAEFIK_FORWARDED)
101        if not ftcc_raw:
102            return []
103        certs = []
104        for cert in ftcc_raw.split(","):
105            certs.extend(self.__parse_single_cert(cert, ParseOptions.UNQUOTE, ParseOptions.FORMAT))
106        return certs
107
108    def _parse_cert_outpost(self) -> list[Certificate]:
109        """Parse certificates in the format outposts give to us. Also authenticates
110        the outpost to ensure it has the permission to do so"""
111        user = ClientIPMiddleware.get_outpost_user(self.request)
112        if not user:
113            return []
114        if not user.has_perm(
115            "pass_outpost_certificate", self.executor.current_stage
116        ) and not user.has_perm("authentik_stages_mtls.pass_outpost_certificate"):
117            return []
118        outpost_raw = self.request.headers.get(HEADER_OUTPOST_FORWARDED)
119        return self.__parse_single_cert(outpost_raw, ParseOptions.UNQUOTE)
120
121    def get_authorities(self) -> list[CertificateKeyPair] | None:
122        # We can't access `certificate_authorities` on `self.executor.current_stage`, as that would
123        # load the certificate into the directly referenced foreign key, which we have to pickle
124        # as part of the flow plan, and cryptography certs can't be pickled
125        stage: MutualTLSStage = (
126            MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk)
127            .prefetch_related("certificate_authorities")
128            .first()
129        )
130        if stage.certificate_authorities.exists():
131            return stage.certificate_authorities.order_by("name")
132        brand: Brand = self.request.brand
133        if brand.client_certificates.exists():
134            return brand.client_certificates.order_by("name")
135        return None
136
137    def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]):
138        authorities_cert = [x.certificate for x in authorities]
139        for _cert in certs:
140            try:
141                PolicyBuilder().store(Store(authorities_cert)).build_client_verifier().verify(
142                    _cert, []
143                )
144                return _cert
145            except (
146                InvalidSignature,
147                TypeError,
148                ValueError,
149                VerificationError,
150                UnsupportedGeneralNameType,
151            ) as exc:
152                self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc)
153                continue
154        return None
155
156    def check_if_user(self, cert: Certificate):
157        stage: MutualTLSStage = self.executor.current_stage
158        cert_attr = None
159        user_attr = None
160        match stage.cert_attribute:
161            case CertAttributes.SUBJECT:
162                cert_attr = cert.subject.rfc4514_string()
163            case CertAttributes.COMMON_NAME:
164                cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME)
165            case CertAttributes.EMAIL:
166                cert_attr = self.get_cert_email(cert)
167        match stage.user_attribute:
168            case UserAttributes.USERNAME:
169                user_attr = "username"
170            case UserAttributes.EMAIL:
171                user_attr = "email"
172        if not user_attr or not cert_attr:
173            return None
174        return User.objects.filter(**{user_attr: cert_attr}).first()
175
176    def _cert_to_dict(self, cert: Certificate) -> dict:
177        """Represent a certificate in a dictionary, as certificate objects cannot be pickled"""
178        return {
179            "serial_number": str(cert.serial_number),
180            "subject": cert.subject.rfc4514_string(),
181            "issuer": cert.issuer.rfc4514_string(),
182            "fingerprint_sha256": hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8"),
183            "fingerprint_sha1": hexlify(cert.fingerprint(hashes.SHA1()), ":").decode(  # nosec
184                "utf-8"
185            ),
186        }
187
188    def auth_user(self, user: User, cert: Certificate):
189        self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
190        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls")
191        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
192        self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
193            {"certificate": self._cert_to_dict(cert)}
194        )
195        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
196
197    def enroll_prepare_user(self, cert: Certificate):
198        self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {})
199        self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(
200            {
201                "email": self.get_cert_email(cert),
202                "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME),
203            }
204        )
205        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
206
207    def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None:
208        attr = cert.subject.get_attributes_for_oid(oid)
209        if len(attr) < 1:
210            return None
211        return str(attr[0].value)
212
213    def get_cert_email(self, cert: Certificate) -> str | None:
214        ext = cert.extensions.get_extension_for_class(SubjectAlternativeName)
215        _cert_attr = ext.value.get_values_for_type(RFC822Name)
216        if len(_cert_attr) < 1:
217            return None
218        return str(_cert_attr[0])
219
220    def dispatch(self, request, *args, **kwargs):
221        stage: MutualTLSStage = self.executor.current_stage
222        certs = [
223            *self._parse_cert_xfcc(),
224            *self._parse_cert_nginx(),
225            *self._parse_cert_traefik(),
226            *self._parse_cert_outpost(),
227        ]
228        authorities = self.get_authorities()
229        if not authorities:
230            self.logger.warning("No Certificate authority found")
231            if stage.mode == StageMode.OPTIONAL:
232                return self.executor.stage_ok()
233            if stage.mode == StageMode.REQUIRED:
234                return super().dispatch(request, *args, **kwargs)
235        cert = self.validate_cert(authorities, certs)
236        if not cert and stage.mode == StageMode.REQUIRED:
237            self.logger.warning("Client certificate required but no certificates given")
238            return super().dispatch(
239                request,
240                *args,
241                error_message=_("Certificate required but no certificate was given."),
242                **kwargs,
243            )
244        if not cert and stage.mode == StageMode.OPTIONAL:
245            self.logger.info("No certificate given, continuing")
246            return self.executor.stage_ok()
247        self.logger.debug("Received certificate", cert=fingerprint_sha256(cert))
248        existing_user = self.check_if_user(cert)
249        if self.executor.flow.designation == FlowDesignation.ENROLLMENT:
250            self.enroll_prepare_user(cert)
251        elif existing_user:
252            self.auth_user(existing_user, cert)
253        else:
254            return super().dispatch(
255                request, *args, error_message=_("No user found for certificate."), **kwargs
256            )
257        return self.executor.stage_ok()
258
259    def get_challenge(self, *args, error_message: str | None = None, **kwargs):
260        return AccessDeniedChallenge(
261            data={
262                "component": "ak-stage-access-denied",
263                "error_message": str(error_message or "Unknown error"),
264            }
265        )
HEADER_PROXY_FORWARDED = 'X-Forwarded-Client-Cert'
HEADER_NGINX_FORWARDED = 'SSL-Client-Cert'
HEADER_TRAEFIK_FORWARDED = 'X-Forwarded-TLS-Client-Cert'
HEADER_OUTPOST_FORWARDED = 'X-Authentik-Outpost-Certificate'
PLAN_CONTEXT_CERTIFICATE = 'certificate'
class ParseOptions(enum.IntFlag):
48class ParseOptions(IntFlag):
49
50    # URL unquote the string
51    UNQUOTE = auto()
52    # Re-add PEM Header & footer, and chunk it into 64 character lines
53    FORMAT = auto()

Support for integer-based Flags

UNQUOTE = <ParseOptions.UNQUOTE: 1>
FORMAT = <ParseOptions.FORMAT: 2>
class MTLSStageView(authentik.flows.stage.ChallengeStageView):
 56class MTLSStageView(ChallengeStageView):
 57
 58    def __parse_single_cert(self, raw: str | None, *options: ParseOptions) -> list[Certificate]:
 59        """Helper to parse a single certificate"""
 60        if not raw:
 61            return []
 62        for opt in options:
 63            match opt:
 64                case ParseOptions.FORMAT:
 65                    raw = format_cert(raw)
 66                case ParseOptions.UNQUOTE:
 67                    raw = unquote_plus(raw)
 68        try:
 69            cert = load_pem_x509_certificate(raw.encode())
 70            return [cert]
 71        except ValueError as exc:
 72            self.logger.info("Failed to parse certificate", exc=exc)
 73            return []
 74
 75    def _parse_cert_xfcc(self) -> list[Certificate]:
 76        """Parse certificates in the format given to us in
 77        the format of the authentik router/envoy"""
 78        # https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#x-forwarded-client-cert
 79        xfcc_raw = self.request.headers.get(HEADER_PROXY_FORWARDED)
 80        if not xfcc_raw:
 81            return []
 82        certs = []
 83        for r_cert in xfcc_raw.split(","):
 84            el = r_cert.split(";")
 85            raw_cert = {k.split("=")[0]: k.split("=")[1] for k in el}
 86            if "Cert" not in raw_cert:
 87                continue
 88            certs.extend(self.__parse_single_cert(raw_cert["Cert"], ParseOptions.UNQUOTE))
 89        return certs
 90
 91    def _parse_cert_nginx(self) -> list[Certificate]:
 92        """Parse certificates in the format nginx-ingress gives to us"""
 93        # https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#client-certificate-authentication
 94        # https://github.com/kubernetes/ingress-nginx/blob/78f593b24494a0674b362faf551079f06d71b5a9/rootfs/etc/nginx/template/nginx.tmpl#L1096
 95        sslcc_raw = self.request.headers.get(HEADER_NGINX_FORWARDED)
 96        return self.__parse_single_cert(sslcc_raw, ParseOptions.UNQUOTE)
 97
 98    def _parse_cert_traefik(self) -> list[Certificate]:
 99        """Parse certificates in the format traefik gives to us"""
100        # https://doc.traefik.io/traefik/reference/routing-configuration/http/middlewares/passtlsclientcert/
101        ftcc_raw = self.request.headers.get(HEADER_TRAEFIK_FORWARDED)
102        if not ftcc_raw:
103            return []
104        certs = []
105        for cert in ftcc_raw.split(","):
106            certs.extend(self.__parse_single_cert(cert, ParseOptions.UNQUOTE, ParseOptions.FORMAT))
107        return certs
108
109    def _parse_cert_outpost(self) -> list[Certificate]:
110        """Parse certificates in the format outposts give to us. Also authenticates
111        the outpost to ensure it has the permission to do so"""
112        user = ClientIPMiddleware.get_outpost_user(self.request)
113        if not user:
114            return []
115        if not user.has_perm(
116            "pass_outpost_certificate", self.executor.current_stage
117        ) and not user.has_perm("authentik_stages_mtls.pass_outpost_certificate"):
118            return []
119        outpost_raw = self.request.headers.get(HEADER_OUTPOST_FORWARDED)
120        return self.__parse_single_cert(outpost_raw, ParseOptions.UNQUOTE)
121
122    def get_authorities(self) -> list[CertificateKeyPair] | None:
123        # We can't access `certificate_authorities` on `self.executor.current_stage`, as that would
124        # load the certificate into the directly referenced foreign key, which we have to pickle
125        # as part of the flow plan, and cryptography certs can't be pickled
126        stage: MutualTLSStage = (
127            MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk)
128            .prefetch_related("certificate_authorities")
129            .first()
130        )
131        if stage.certificate_authorities.exists():
132            return stage.certificate_authorities.order_by("name")
133        brand: Brand = self.request.brand
134        if brand.client_certificates.exists():
135            return brand.client_certificates.order_by("name")
136        return None
137
138    def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]):
139        authorities_cert = [x.certificate for x in authorities]
140        for _cert in certs:
141            try:
142                PolicyBuilder().store(Store(authorities_cert)).build_client_verifier().verify(
143                    _cert, []
144                )
145                return _cert
146            except (
147                InvalidSignature,
148                TypeError,
149                ValueError,
150                VerificationError,
151                UnsupportedGeneralNameType,
152            ) as exc:
153                self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc)
154                continue
155        return None
156
157    def check_if_user(self, cert: Certificate):
158        stage: MutualTLSStage = self.executor.current_stage
159        cert_attr = None
160        user_attr = None
161        match stage.cert_attribute:
162            case CertAttributes.SUBJECT:
163                cert_attr = cert.subject.rfc4514_string()
164            case CertAttributes.COMMON_NAME:
165                cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME)
166            case CertAttributes.EMAIL:
167                cert_attr = self.get_cert_email(cert)
168        match stage.user_attribute:
169            case UserAttributes.USERNAME:
170                user_attr = "username"
171            case UserAttributes.EMAIL:
172                user_attr = "email"
173        if not user_attr or not cert_attr:
174            return None
175        return User.objects.filter(**{user_attr: cert_attr}).first()
176
177    def _cert_to_dict(self, cert: Certificate) -> dict:
178        """Represent a certificate in a dictionary, as certificate objects cannot be pickled"""
179        return {
180            "serial_number": str(cert.serial_number),
181            "subject": cert.subject.rfc4514_string(),
182            "issuer": cert.issuer.rfc4514_string(),
183            "fingerprint_sha256": hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8"),
184            "fingerprint_sha1": hexlify(cert.fingerprint(hashes.SHA1()), ":").decode(  # nosec
185                "utf-8"
186            ),
187        }
188
189    def auth_user(self, user: User, cert: Certificate):
190        self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
191        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls")
192        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
193        self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
194            {"certificate": self._cert_to_dict(cert)}
195        )
196        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
197
198    def enroll_prepare_user(self, cert: Certificate):
199        self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {})
200        self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(
201            {
202                "email": self.get_cert_email(cert),
203                "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME),
204            }
205        )
206        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
207
208    def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None:
209        attr = cert.subject.get_attributes_for_oid(oid)
210        if len(attr) < 1:
211            return None
212        return str(attr[0].value)
213
214    def get_cert_email(self, cert: Certificate) -> str | None:
215        ext = cert.extensions.get_extension_for_class(SubjectAlternativeName)
216        _cert_attr = ext.value.get_values_for_type(RFC822Name)
217        if len(_cert_attr) < 1:
218            return None
219        return str(_cert_attr[0])
220
221    def dispatch(self, request, *args, **kwargs):
222        stage: MutualTLSStage = self.executor.current_stage
223        certs = [
224            *self._parse_cert_xfcc(),
225            *self._parse_cert_nginx(),
226            *self._parse_cert_traefik(),
227            *self._parse_cert_outpost(),
228        ]
229        authorities = self.get_authorities()
230        if not authorities:
231            self.logger.warning("No Certificate authority found")
232            if stage.mode == StageMode.OPTIONAL:
233                return self.executor.stage_ok()
234            if stage.mode == StageMode.REQUIRED:
235                return super().dispatch(request, *args, **kwargs)
236        cert = self.validate_cert(authorities, certs)
237        if not cert and stage.mode == StageMode.REQUIRED:
238            self.logger.warning("Client certificate required but no certificates given")
239            return super().dispatch(
240                request,
241                *args,
242                error_message=_("Certificate required but no certificate was given."),
243                **kwargs,
244            )
245        if not cert and stage.mode == StageMode.OPTIONAL:
246            self.logger.info("No certificate given, continuing")
247            return self.executor.stage_ok()
248        self.logger.debug("Received certificate", cert=fingerprint_sha256(cert))
249        existing_user = self.check_if_user(cert)
250        if self.executor.flow.designation == FlowDesignation.ENROLLMENT:
251            self.enroll_prepare_user(cert)
252        elif existing_user:
253            self.auth_user(existing_user, cert)
254        else:
255            return super().dispatch(
256                request, *args, error_message=_("No user found for certificate."), **kwargs
257            )
258        return self.executor.stage_ok()
259
260    def get_challenge(self, *args, error_message: str | None = None, **kwargs):
261        return AccessDeniedChallenge(
262            data={
263                "component": "ak-stage-access-denied",
264                "error_message": str(error_message or "Unknown error"),
265            }
266        )

Stage view which response with a challenge

def get_authorities(self) -> list[authentik.crypto.models.CertificateKeyPair] | None:
122    def get_authorities(self) -> list[CertificateKeyPair] | None:
123        # We can't access `certificate_authorities` on `self.executor.current_stage`, as that would
124        # load the certificate into the directly referenced foreign key, which we have to pickle
125        # as part of the flow plan, and cryptography certs can't be pickled
126        stage: MutualTLSStage = (
127            MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk)
128            .prefetch_related("certificate_authorities")
129            .first()
130        )
131        if stage.certificate_authorities.exists():
132            return stage.certificate_authorities.order_by("name")
133        brand: Brand = self.request.brand
134        if brand.client_certificates.exists():
135            return brand.client_certificates.order_by("name")
136        return None
def validate_cert( self, authorities: list[authentik.crypto.models.CertificateKeyPair], certs: list[cryptography.hazmat.bindings._rust.x509.Certificate]):
138    def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]):
139        authorities_cert = [x.certificate for x in authorities]
140        for _cert in certs:
141            try:
142                PolicyBuilder().store(Store(authorities_cert)).build_client_verifier().verify(
143                    _cert, []
144                )
145                return _cert
146            except (
147                InvalidSignature,
148                TypeError,
149                ValueError,
150                VerificationError,
151                UnsupportedGeneralNameType,
152            ) as exc:
153                self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc)
154                continue
155        return None
def check_if_user(self, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
157    def check_if_user(self, cert: Certificate):
158        stage: MutualTLSStage = self.executor.current_stage
159        cert_attr = None
160        user_attr = None
161        match stage.cert_attribute:
162            case CertAttributes.SUBJECT:
163                cert_attr = cert.subject.rfc4514_string()
164            case CertAttributes.COMMON_NAME:
165                cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME)
166            case CertAttributes.EMAIL:
167                cert_attr = self.get_cert_email(cert)
168        match stage.user_attribute:
169            case UserAttributes.USERNAME:
170                user_attr = "username"
171            case UserAttributes.EMAIL:
172                user_attr = "email"
173        if not user_attr or not cert_attr:
174            return None
175        return User.objects.filter(**{user_attr: cert_attr}).first()
def auth_user( self, user: authentik.core.models.User, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
189    def auth_user(self, user: User, cert: Certificate):
190        self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
191        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls")
192        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
193        self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
194            {"certificate": self._cert_to_dict(cert)}
195        )
196        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
def enroll_prepare_user(self, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
198    def enroll_prepare_user(self, cert: Certificate):
199        self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {})
200        self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(
201            {
202                "email": self.get_cert_email(cert),
203                "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME),
204            }
205        )
206        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
def get_cert_attribute( self, cert: cryptography.hazmat.bindings._rust.x509.Certificate, oid: cryptography.hazmat.bindings._rust.ObjectIdentifier) -> str | None:
208    def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None:
209        attr = cert.subject.get_attributes_for_oid(oid)
210        if len(attr) < 1:
211            return None
212        return str(attr[0].value)
def get_cert_email( self, cert: cryptography.hazmat.bindings._rust.x509.Certificate) -> str | None:
214    def get_cert_email(self, cert: Certificate) -> str | None:
215        ext = cert.extensions.get_extension_for_class(SubjectAlternativeName)
216        _cert_attr = ext.value.get_values_for_type(RFC822Name)
217        if len(_cert_attr) < 1:
218            return None
219        return str(_cert_attr[0])
def dispatch(self, request, *args, **kwargs):
221    def dispatch(self, request, *args, **kwargs):
222        stage: MutualTLSStage = self.executor.current_stage
223        certs = [
224            *self._parse_cert_xfcc(),
225            *self._parse_cert_nginx(),
226            *self._parse_cert_traefik(),
227            *self._parse_cert_outpost(),
228        ]
229        authorities = self.get_authorities()
230        if not authorities:
231            self.logger.warning("No Certificate authority found")
232            if stage.mode == StageMode.OPTIONAL:
233                return self.executor.stage_ok()
234            if stage.mode == StageMode.REQUIRED:
235                return super().dispatch(request, *args, **kwargs)
236        cert = self.validate_cert(authorities, certs)
237        if not cert and stage.mode == StageMode.REQUIRED:
238            self.logger.warning("Client certificate required but no certificates given")
239            return super().dispatch(
240                request,
241                *args,
242                error_message=_("Certificate required but no certificate was given."),
243                **kwargs,
244            )
245        if not cert and stage.mode == StageMode.OPTIONAL:
246            self.logger.info("No certificate given, continuing")
247            return self.executor.stage_ok()
248        self.logger.debug("Received certificate", cert=fingerprint_sha256(cert))
249        existing_user = self.check_if_user(cert)
250        if self.executor.flow.designation == FlowDesignation.ENROLLMENT:
251            self.enroll_prepare_user(cert)
252        elif existing_user:
253            self.auth_user(existing_user, cert)
254        else:
255            return super().dispatch(
256                request, *args, error_message=_("No user found for certificate."), **kwargs
257            )
258        return self.executor.stage_ok()
def get_challenge(self, *args, error_message: str | None = None, **kwargs):
260    def get_challenge(self, *args, error_message: str | None = None, **kwargs):
261        return AccessDeniedChallenge(
262            data={
263                "component": "ak-stage-access-denied",
264                "error_message": str(error_message or "Unknown error"),
265            }
266        )

Return the challenge that the client should solve