authentik.enterprise.stages.mtls.stage

  1from binascii import hexlify
  2from enum import IntFlag, auto
  3from urllib.parse import unquote_plus
  4
  5from cryptography.exceptions import InvalidSignature
  6from cryptography.hazmat.primitives import hashes
  7from cryptography.x509 import (
  8    Certificate,
  9    NameOID,
 10    ObjectIdentifier,
 11    RFC822Name,
 12    SubjectAlternativeName,
 13    UnsupportedGeneralNameType,
 14    load_pem_x509_certificate,
 15)
 16from cryptography.x509.verification import PolicyBuilder, Store, VerificationError
 17from django.utils.timezone import now
 18from django.utils.translation import gettext_lazy as _
 19from rest_framework.exceptions import PermissionDenied
 20
 21from authentik.brands.models import Brand
 22from authentik.core.models import User
 23from authentik.crypto.models import CertificateKeyPair, fingerprint_sha256, format_cert
 24from authentik.endpoints.models import StageMode
 25from authentik.enterprise.stages.mtls.models import (
 26    CertAttributes,
 27    MutualTLSStage,
 28    UserAttributes,
 29)
 30from authentik.flows.models import FlowDesignation
 31from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
 32from authentik.flows.stage import ChallengeStageView
 33from authentik.root.middleware import ClientIPMiddleware
 34from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
 35from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 36
 37# All of these headers must only be accepted from "trusted" reverse proxies
 38# See internal/web/proxy.go:39
 39HEADER_PROXY_FORWARDED = "X-Forwarded-Client-Cert"
 40HEADER_NGINX_FORWARDED = "SSL-Client-Cert"
 41HEADER_TRAEFIK_FORWARDED = "X-Forwarded-TLS-Client-Cert"
 42HEADER_OUTPOST_FORWARDED = "X-Authentik-Outpost-Certificate"
 43
 44
 45PLAN_CONTEXT_CERTIFICATE = "certificate"
 46
 47
 48class ParseOptions(IntFlag):
 49
 50    # URL unquote the string
 51    UNQUOTE = auto()
 52    # Re-add PEM Header & footer, and chunk it into 64 character lines
 53    FORMAT = auto()
 54
 55
 56class MTLSStageView(ChallengeStageView):
 57
 58    def __parse_single_cert(self, raw: str | None, *options: ParseOptions) -> list[Certificate]:
 59        """Helper to parse a single certificate"""
 60        if not raw:
 61            return []
 62        for opt in options:
 63            match opt:
 64                case ParseOptions.FORMAT:
 65                    raw = format_cert(raw)
 66                case ParseOptions.UNQUOTE:
 67                    raw = unquote_plus(raw)
 68        try:
 69            cert = load_pem_x509_certificate(raw.encode())
 70            return [cert]
 71        except ValueError as exc:
 72            self.logger.info("Failed to parse certificate", exc=exc)
 73            return []
 74
 75    def _parse_cert_xfcc(self) -> list[Certificate]:
 76        """Parse certificates in the format given to us in
 77        the format of the authentik router/envoy"""
 78        # https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#x-forwarded-client-cert
 79        xfcc_raw = self.request.headers.get(HEADER_PROXY_FORWARDED)
 80        if not xfcc_raw:
 81            return []
 82        certs = []
 83        for r_cert in xfcc_raw.split(","):
 84            el = r_cert.split(";")
 85            raw_cert = {k.split("=")[0]: k.split("=")[1] for k in el}
 86            if "Cert" not in raw_cert:
 87                continue
 88            certs.extend(self.__parse_single_cert(raw_cert["Cert"], ParseOptions.UNQUOTE))
 89        return certs
 90
 91    def _parse_cert_nginx(self) -> list[Certificate]:
 92        """Parse certificates in the format nginx-ingress gives to us"""
 93        # https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#client-certificate-authentication
 94        # https://github.com/kubernetes/ingress-nginx/blob/78f593b24494a0674b362faf551079f06d71b5a9/rootfs/etc/nginx/template/nginx.tmpl#L1096
 95        sslcc_raw = self.request.headers.get(HEADER_NGINX_FORWARDED)
 96        return self.__parse_single_cert(sslcc_raw, ParseOptions.UNQUOTE)
 97
 98    def _parse_cert_traefik(self) -> list[Certificate]:
 99        """Parse certificates in the format traefik gives to us"""
100        # https://doc.traefik.io/traefik/reference/routing-configuration/http/middlewares/passtlsclientcert/
101        ftcc_raw = self.request.headers.get(HEADER_TRAEFIK_FORWARDED)
102        if not ftcc_raw:
103            return []
104        certs = []
105        for cert in ftcc_raw.split(","):
106            certs.extend(self.__parse_single_cert(cert, ParseOptions.FORMAT))
107        return certs
108
109    def _parse_cert_outpost(self) -> list[Certificate]:
110        """Parse certificates in the format outposts give to us. Also authenticates
111        the outpost to ensure it has the permission to do so"""
112        user = ClientIPMiddleware.get_outpost_user(self.request)
113        if not user:
114            return []
115        if not user.has_perm(
116            "pass_outpost_certificate", self.executor.current_stage
117        ) and not user.has_perm("authentik_stages_mtls.pass_outpost_certificate"):
118            return []
119        outpost_raw = self.request.headers.get(HEADER_OUTPOST_FORWARDED)
120        return self.__parse_single_cert(outpost_raw, ParseOptions.UNQUOTE)
121
122    def get_authorities(self) -> list[CertificateKeyPair] | None:
123        # We can't access `certificate_authorities` on `self.executor.current_stage`, as that would
124        # load the certificate into the directly referenced foreign key, which we have to pickle
125        # as part of the flow plan, and cryptography certs can't be pickled
126        stage: MutualTLSStage = (
127            MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk)
128            .prefetch_related("certificate_authorities")
129            .first()
130        )
131        if stage.certificate_authorities.exists():
132            return stage.certificate_authorities.order_by("name")
133        brand: Brand = self.request.brand
134        if brand.client_certificates.exists():
135            return brand.client_certificates.order_by("name")
136        return None
137
138    def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]):
139        authorities_cert = [x.certificate for x in authorities]
140        for _cert in certs:
141            try:
142                PolicyBuilder().store(Store(authorities_cert)).time(
143                    now()
144                ).build_client_verifier().verify(_cert, [])
145                return _cert
146            except (
147                InvalidSignature,
148                TypeError,
149                ValueError,
150                VerificationError,
151                UnsupportedGeneralNameType,
152            ) as exc:
153                self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc)
154                continue
155        return None
156
157    def check_if_user(self, cert: Certificate):
158        stage: MutualTLSStage = self.executor.current_stage
159        cert_attr = None
160        user_attr = None
161        match stage.cert_attribute:
162            case CertAttributes.SUBJECT:
163                cert_attr = cert.subject.rfc4514_string()
164            case CertAttributes.COMMON_NAME:
165                cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME)
166            case CertAttributes.EMAIL:
167                cert_attr = self.get_cert_email(cert)
168        match stage.user_attribute:
169            case UserAttributes.USERNAME:
170                user_attr = "username"
171            case UserAttributes.EMAIL:
172                user_attr = "email"
173        if not user_attr or not cert_attr:
174            return None
175        return User.objects.filter(**{user_attr: cert_attr}).first()
176
177    def _cert_to_dict(self, cert: Certificate) -> dict:
178        """Represent a certificate in a dictionary, as certificate objects cannot be pickled"""
179        return {
180            "serial_number": str(cert.serial_number),
181            "subject": cert.subject.rfc4514_string(),
182            "issuer": cert.issuer.rfc4514_string(),
183            "fingerprint_sha256": hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8"),
184            "fingerprint_sha1": hexlify(cert.fingerprint(hashes.SHA1()), ":").decode(  # nosec
185                "utf-8"
186            ),
187        }
188
189    def auth_user(self, user: User, cert: Certificate):
190        self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
191        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls")
192        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
193        self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
194            {"certificate": self._cert_to_dict(cert)}
195        )
196        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
197
198    def enroll_prepare_user(self, cert: Certificate):
199        self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {})
200        self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(
201            {
202                "email": self.get_cert_email(cert),
203                "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME),
204            }
205        )
206        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
207
208    def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None:
209        attr = cert.subject.get_attributes_for_oid(oid)
210        if len(attr) < 1:
211            return None
212        return str(attr[0].value)
213
214    def get_cert_email(self, cert: Certificate) -> str | None:
215        ext = cert.extensions.get_extension_for_class(SubjectAlternativeName)
216        _cert_attr = ext.value.get_values_for_type(RFC822Name)
217        if len(_cert_attr) < 1:
218            return None
219        return str(_cert_attr[0])
220
221    def get_cert(self, mode: StageMode):
222        certs = [
223            *self._parse_cert_xfcc(),
224            *self._parse_cert_nginx(),
225            *self._parse_cert_traefik(),
226            *self._parse_cert_outpost(),
227        ]
228        authorities = self.get_authorities()
229        if not authorities:
230            self.logger.warning("No Certificate authority found")
231            if mode == StageMode.OPTIONAL:
232                return None
233            if mode == StageMode.REQUIRED:
234                raise PermissionDenied("Unknown error")
235        cert = self.validate_cert(authorities, certs)
236        if not cert and mode == StageMode.REQUIRED:
237            self.logger.warning("Client certificate required but no certificates given")
238            raise PermissionDenied(str(_("Certificate required but no certificate was given.")))
239        if not cert and mode == StageMode.OPTIONAL:
240            self.logger.info("No certificate given, continuing")
241            return None
242        return cert
243
244    def dispatch(self, request, *args, **kwargs):
245        stage: MutualTLSStage = self.executor.current_stage
246        try:
247            cert = self.get_cert(stage.mode)
248        except PermissionDenied as exc:
249            return self.executor.stage_invalid(error_message=exc.detail)
250        if not cert:
251            return self.executor.stage_ok()
252        self.logger.debug("Received certificate", cert=fingerprint_sha256(cert))
253        existing_user = self.check_if_user(cert)
254        if self.executor.flow.designation == FlowDesignation.ENROLLMENT:
255            self.enroll_prepare_user(cert)
256        elif existing_user:
257            self.auth_user(existing_user, cert)
258        else:
259            return self.executor.stage_invalid(_("No user found for certificate."))
260        return self.executor.stage_ok()
HEADER_PROXY_FORWARDED = 'X-Forwarded-Client-Cert'
HEADER_NGINX_FORWARDED = 'SSL-Client-Cert'
HEADER_TRAEFIK_FORWARDED = 'X-Forwarded-TLS-Client-Cert'
HEADER_OUTPOST_FORWARDED = 'X-Authentik-Outpost-Certificate'
PLAN_CONTEXT_CERTIFICATE = 'certificate'
class ParseOptions(enum.IntFlag):
49class ParseOptions(IntFlag):
50
51    # URL unquote the string
52    UNQUOTE = auto()
53    # Re-add PEM Header & footer, and chunk it into 64 character lines
54    FORMAT = auto()

Support for integer-based Flags

UNQUOTE = <ParseOptions.UNQUOTE: 1>
FORMAT = <ParseOptions.FORMAT: 2>
class MTLSStageView(authentik.flows.stage.ChallengeStageView):
 57class MTLSStageView(ChallengeStageView):
 58
 59    def __parse_single_cert(self, raw: str | None, *options: ParseOptions) -> list[Certificate]:
 60        """Helper to parse a single certificate"""
 61        if not raw:
 62            return []
 63        for opt in options:
 64            match opt:
 65                case ParseOptions.FORMAT:
 66                    raw = format_cert(raw)
 67                case ParseOptions.UNQUOTE:
 68                    raw = unquote_plus(raw)
 69        try:
 70            cert = load_pem_x509_certificate(raw.encode())
 71            return [cert]
 72        except ValueError as exc:
 73            self.logger.info("Failed to parse certificate", exc=exc)
 74            return []
 75
 76    def _parse_cert_xfcc(self) -> list[Certificate]:
 77        """Parse certificates in the format given to us in
 78        the format of the authentik router/envoy"""
 79        # https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#x-forwarded-client-cert
 80        xfcc_raw = self.request.headers.get(HEADER_PROXY_FORWARDED)
 81        if not xfcc_raw:
 82            return []
 83        certs = []
 84        for r_cert in xfcc_raw.split(","):
 85            el = r_cert.split(";")
 86            raw_cert = {k.split("=")[0]: k.split("=")[1] for k in el}
 87            if "Cert" not in raw_cert:
 88                continue
 89            certs.extend(self.__parse_single_cert(raw_cert["Cert"], ParseOptions.UNQUOTE))
 90        return certs
 91
 92    def _parse_cert_nginx(self) -> list[Certificate]:
 93        """Parse certificates in the format nginx-ingress gives to us"""
 94        # https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#client-certificate-authentication
 95        # https://github.com/kubernetes/ingress-nginx/blob/78f593b24494a0674b362faf551079f06d71b5a9/rootfs/etc/nginx/template/nginx.tmpl#L1096
 96        sslcc_raw = self.request.headers.get(HEADER_NGINX_FORWARDED)
 97        return self.__parse_single_cert(sslcc_raw, ParseOptions.UNQUOTE)
 98
 99    def _parse_cert_traefik(self) -> list[Certificate]:
100        """Parse certificates in the format traefik gives to us"""
101        # https://doc.traefik.io/traefik/reference/routing-configuration/http/middlewares/passtlsclientcert/
102        ftcc_raw = self.request.headers.get(HEADER_TRAEFIK_FORWARDED)
103        if not ftcc_raw:
104            return []
105        certs = []
106        for cert in ftcc_raw.split(","):
107            certs.extend(self.__parse_single_cert(cert, ParseOptions.FORMAT))
108        return certs
109
110    def _parse_cert_outpost(self) -> list[Certificate]:
111        """Parse certificates in the format outposts give to us. Also authenticates
112        the outpost to ensure it has the permission to do so"""
113        user = ClientIPMiddleware.get_outpost_user(self.request)
114        if not user:
115            return []
116        if not user.has_perm(
117            "pass_outpost_certificate", self.executor.current_stage
118        ) and not user.has_perm("authentik_stages_mtls.pass_outpost_certificate"):
119            return []
120        outpost_raw = self.request.headers.get(HEADER_OUTPOST_FORWARDED)
121        return self.__parse_single_cert(outpost_raw, ParseOptions.UNQUOTE)
122
123    def get_authorities(self) -> list[CertificateKeyPair] | None:
124        # We can't access `certificate_authorities` on `self.executor.current_stage`, as that would
125        # load the certificate into the directly referenced foreign key, which we have to pickle
126        # as part of the flow plan, and cryptography certs can't be pickled
127        stage: MutualTLSStage = (
128            MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk)
129            .prefetch_related("certificate_authorities")
130            .first()
131        )
132        if stage.certificate_authorities.exists():
133            return stage.certificate_authorities.order_by("name")
134        brand: Brand = self.request.brand
135        if brand.client_certificates.exists():
136            return brand.client_certificates.order_by("name")
137        return None
138
139    def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]):
140        authorities_cert = [x.certificate for x in authorities]
141        for _cert in certs:
142            try:
143                PolicyBuilder().store(Store(authorities_cert)).time(
144                    now()
145                ).build_client_verifier().verify(_cert, [])
146                return _cert
147            except (
148                InvalidSignature,
149                TypeError,
150                ValueError,
151                VerificationError,
152                UnsupportedGeneralNameType,
153            ) as exc:
154                self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc)
155                continue
156        return None
157
158    def check_if_user(self, cert: Certificate):
159        stage: MutualTLSStage = self.executor.current_stage
160        cert_attr = None
161        user_attr = None
162        match stage.cert_attribute:
163            case CertAttributes.SUBJECT:
164                cert_attr = cert.subject.rfc4514_string()
165            case CertAttributes.COMMON_NAME:
166                cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME)
167            case CertAttributes.EMAIL:
168                cert_attr = self.get_cert_email(cert)
169        match stage.user_attribute:
170            case UserAttributes.USERNAME:
171                user_attr = "username"
172            case UserAttributes.EMAIL:
173                user_attr = "email"
174        if not user_attr or not cert_attr:
175            return None
176        return User.objects.filter(**{user_attr: cert_attr}).first()
177
178    def _cert_to_dict(self, cert: Certificate) -> dict:
179        """Represent a certificate in a dictionary, as certificate objects cannot be pickled"""
180        return {
181            "serial_number": str(cert.serial_number),
182            "subject": cert.subject.rfc4514_string(),
183            "issuer": cert.issuer.rfc4514_string(),
184            "fingerprint_sha256": hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8"),
185            "fingerprint_sha1": hexlify(cert.fingerprint(hashes.SHA1()), ":").decode(  # nosec
186                "utf-8"
187            ),
188        }
189
190    def auth_user(self, user: User, cert: Certificate):
191        self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
192        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls")
193        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
194        self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
195            {"certificate": self._cert_to_dict(cert)}
196        )
197        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
198
199    def enroll_prepare_user(self, cert: Certificate):
200        self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {})
201        self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(
202            {
203                "email": self.get_cert_email(cert),
204                "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME),
205            }
206        )
207        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
208
209    def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None:
210        attr = cert.subject.get_attributes_for_oid(oid)
211        if len(attr) < 1:
212            return None
213        return str(attr[0].value)
214
215    def get_cert_email(self, cert: Certificate) -> str | None:
216        ext = cert.extensions.get_extension_for_class(SubjectAlternativeName)
217        _cert_attr = ext.value.get_values_for_type(RFC822Name)
218        if len(_cert_attr) < 1:
219            return None
220        return str(_cert_attr[0])
221
222    def get_cert(self, mode: StageMode):
223        certs = [
224            *self._parse_cert_xfcc(),
225            *self._parse_cert_nginx(),
226            *self._parse_cert_traefik(),
227            *self._parse_cert_outpost(),
228        ]
229        authorities = self.get_authorities()
230        if not authorities:
231            self.logger.warning("No Certificate authority found")
232            if mode == StageMode.OPTIONAL:
233                return None
234            if mode == StageMode.REQUIRED:
235                raise PermissionDenied("Unknown error")
236        cert = self.validate_cert(authorities, certs)
237        if not cert and mode == StageMode.REQUIRED:
238            self.logger.warning("Client certificate required but no certificates given")
239            raise PermissionDenied(str(_("Certificate required but no certificate was given.")))
240        if not cert and mode == StageMode.OPTIONAL:
241            self.logger.info("No certificate given, continuing")
242            return None
243        return cert
244
245    def dispatch(self, request, *args, **kwargs):
246        stage: MutualTLSStage = self.executor.current_stage
247        try:
248            cert = self.get_cert(stage.mode)
249        except PermissionDenied as exc:
250            return self.executor.stage_invalid(error_message=exc.detail)
251        if not cert:
252            return self.executor.stage_ok()
253        self.logger.debug("Received certificate", cert=fingerprint_sha256(cert))
254        existing_user = self.check_if_user(cert)
255        if self.executor.flow.designation == FlowDesignation.ENROLLMENT:
256            self.enroll_prepare_user(cert)
257        elif existing_user:
258            self.auth_user(existing_user, cert)
259        else:
260            return self.executor.stage_invalid(_("No user found for certificate."))
261        return self.executor.stage_ok()

Stage view which response with a challenge

def get_authorities(self) -> list[authentik.crypto.models.CertificateKeyPair] | None:
123    def get_authorities(self) -> list[CertificateKeyPair] | None:
124        # We can't access `certificate_authorities` on `self.executor.current_stage`, as that would
125        # load the certificate into the directly referenced foreign key, which we have to pickle
126        # as part of the flow plan, and cryptography certs can't be pickled
127        stage: MutualTLSStage = (
128            MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk)
129            .prefetch_related("certificate_authorities")
130            .first()
131        )
132        if stage.certificate_authorities.exists():
133            return stage.certificate_authorities.order_by("name")
134        brand: Brand = self.request.brand
135        if brand.client_certificates.exists():
136            return brand.client_certificates.order_by("name")
137        return None
def validate_cert( self, authorities: list[authentik.crypto.models.CertificateKeyPair], certs: list[cryptography.hazmat.bindings._rust.x509.Certificate]):
139    def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]):
140        authorities_cert = [x.certificate for x in authorities]
141        for _cert in certs:
142            try:
143                PolicyBuilder().store(Store(authorities_cert)).time(
144                    now()
145                ).build_client_verifier().verify(_cert, [])
146                return _cert
147            except (
148                InvalidSignature,
149                TypeError,
150                ValueError,
151                VerificationError,
152                UnsupportedGeneralNameType,
153            ) as exc:
154                self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc)
155                continue
156        return None
def check_if_user(self, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
158    def check_if_user(self, cert: Certificate):
159        stage: MutualTLSStage = self.executor.current_stage
160        cert_attr = None
161        user_attr = None
162        match stage.cert_attribute:
163            case CertAttributes.SUBJECT:
164                cert_attr = cert.subject.rfc4514_string()
165            case CertAttributes.COMMON_NAME:
166                cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME)
167            case CertAttributes.EMAIL:
168                cert_attr = self.get_cert_email(cert)
169        match stage.user_attribute:
170            case UserAttributes.USERNAME:
171                user_attr = "username"
172            case UserAttributes.EMAIL:
173                user_attr = "email"
174        if not user_attr or not cert_attr:
175            return None
176        return User.objects.filter(**{user_attr: cert_attr}).first()
def auth_user( self, user: authentik.core.models.User, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
190    def auth_user(self, user: User, cert: Certificate):
191        self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
192        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls")
193        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
194        self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
195            {"certificate": self._cert_to_dict(cert)}
196        )
197        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
def enroll_prepare_user(self, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
199    def enroll_prepare_user(self, cert: Certificate):
200        self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {})
201        self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(
202            {
203                "email": self.get_cert_email(cert),
204                "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME),
205            }
206        )
207        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
def get_cert_attribute( self, cert: cryptography.hazmat.bindings._rust.x509.Certificate, oid: cryptography.hazmat.bindings._rust.ObjectIdentifier) -> str | None:
209    def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None:
210        attr = cert.subject.get_attributes_for_oid(oid)
211        if len(attr) < 1:
212            return None
213        return str(attr[0].value)
def get_cert_email( self, cert: cryptography.hazmat.bindings._rust.x509.Certificate) -> str | None:
215    def get_cert_email(self, cert: Certificate) -> str | None:
216        ext = cert.extensions.get_extension_for_class(SubjectAlternativeName)
217        _cert_attr = ext.value.get_values_for_type(RFC822Name)
218        if len(_cert_attr) < 1:
219            return None
220        return str(_cert_attr[0])
def get_cert(self, mode: authentik.endpoints.models.StageMode):
222    def get_cert(self, mode: StageMode):
223        certs = [
224            *self._parse_cert_xfcc(),
225            *self._parse_cert_nginx(),
226            *self._parse_cert_traefik(),
227            *self._parse_cert_outpost(),
228        ]
229        authorities = self.get_authorities()
230        if not authorities:
231            self.logger.warning("No Certificate authority found")
232            if mode == StageMode.OPTIONAL:
233                return None
234            if mode == StageMode.REQUIRED:
235                raise PermissionDenied("Unknown error")
236        cert = self.validate_cert(authorities, certs)
237        if not cert and mode == StageMode.REQUIRED:
238            self.logger.warning("Client certificate required but no certificates given")
239            raise PermissionDenied(str(_("Certificate required but no certificate was given.")))
240        if not cert and mode == StageMode.OPTIONAL:
241            self.logger.info("No certificate given, continuing")
242            return None
243        return cert
def dispatch(self, request, *args, **kwargs):
245    def dispatch(self, request, *args, **kwargs):
246        stage: MutualTLSStage = self.executor.current_stage
247        try:
248            cert = self.get_cert(stage.mode)
249        except PermissionDenied as exc:
250            return self.executor.stage_invalid(error_message=exc.detail)
251        if not cert:
252            return self.executor.stage_ok()
253        self.logger.debug("Received certificate", cert=fingerprint_sha256(cert))
254        existing_user = self.check_if_user(cert)
255        if self.executor.flow.designation == FlowDesignation.ENROLLMENT:
256            self.enroll_prepare_user(cert)
257        elif existing_user:
258            self.auth_user(existing_user, cert)
259        else:
260            return self.executor.stage_invalid(_("No user found for certificate."))
261        return self.executor.stage_ok()