authentik.enterprise.stages.mtls.stage
1from binascii import hexlify 2from enum import IntFlag, auto 3from urllib.parse import unquote_plus 4 5from cryptography.exceptions import InvalidSignature 6from cryptography.hazmat.primitives import hashes 7from cryptography.x509 import ( 8 Certificate, 9 NameOID, 10 ObjectIdentifier, 11 RFC822Name, 12 SubjectAlternativeName, 13 UnsupportedGeneralNameType, 14 load_pem_x509_certificate, 15) 16from cryptography.x509.verification import PolicyBuilder, Store, VerificationError 17from django.utils.timezone import now 18from django.utils.translation import gettext_lazy as _ 19from rest_framework.exceptions import PermissionDenied 20 21from authentik.brands.models import Brand 22from authentik.core.models import User 23from authentik.crypto.models import CertificateKeyPair, fingerprint_sha256, format_cert 24from authentik.endpoints.models import StageMode 25from authentik.enterprise.stages.mtls.models import ( 26 CertAttributes, 27 MutualTLSStage, 28 UserAttributes, 29) 30from authentik.flows.models import FlowDesignation 31from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER 32from authentik.flows.stage import ChallengeStageView 33from authentik.root.middleware import ClientIPMiddleware 34from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS 35from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 36 37# All of these headers must only be accepted from "trusted" reverse proxies 38# See internal/web/proxy.go:39 39HEADER_PROXY_FORWARDED = "X-Forwarded-Client-Cert" 40HEADER_NGINX_FORWARDED = "SSL-Client-Cert" 41HEADER_TRAEFIK_FORWARDED = "X-Forwarded-TLS-Client-Cert" 42HEADER_OUTPOST_FORWARDED = "X-Authentik-Outpost-Certificate" 43 44 45PLAN_CONTEXT_CERTIFICATE = "certificate" 46 47 48class ParseOptions(IntFlag): 49 50 # URL unquote the string 51 UNQUOTE = auto() 52 # Re-add PEM Header & footer, and chunk it into 64 character lines 53 FORMAT = auto() 54 55 56class MTLSStageView(ChallengeStageView): 57 58 def __parse_single_cert(self, raw: str | None, *options: ParseOptions) -> list[Certificate]: 59 """Helper to parse a single certificate""" 60 if not raw: 61 return [] 62 for opt in options: 63 match opt: 64 case ParseOptions.FORMAT: 65 raw = format_cert(raw) 66 case ParseOptions.UNQUOTE: 67 raw = unquote_plus(raw) 68 try: 69 cert = load_pem_x509_certificate(raw.encode()) 70 return [cert] 71 except ValueError as exc: 72 self.logger.info("Failed to parse certificate", exc=exc) 73 return [] 74 75 def _parse_cert_xfcc(self) -> list[Certificate]: 76 """Parse certificates in the format given to us in 77 the format of the authentik router/envoy""" 78 # https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#x-forwarded-client-cert 79 xfcc_raw = self.request.headers.get(HEADER_PROXY_FORWARDED) 80 if not xfcc_raw: 81 return [] 82 certs = [] 83 for r_cert in xfcc_raw.split(","): 84 el = r_cert.split(";") 85 raw_cert = {k.split("=")[0]: k.split("=")[1] for k in el} 86 if "Cert" not in raw_cert: 87 continue 88 certs.extend(self.__parse_single_cert(raw_cert["Cert"], ParseOptions.UNQUOTE)) 89 return certs 90 91 def _parse_cert_nginx(self) -> list[Certificate]: 92 """Parse certificates in the format nginx-ingress gives to us""" 93 # https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#client-certificate-authentication 94 # https://github.com/kubernetes/ingress-nginx/blob/78f593b24494a0674b362faf551079f06d71b5a9/rootfs/etc/nginx/template/nginx.tmpl#L1096 95 sslcc_raw = self.request.headers.get(HEADER_NGINX_FORWARDED) 96 return self.__parse_single_cert(sslcc_raw, ParseOptions.UNQUOTE) 97 98 def _parse_cert_traefik(self) -> list[Certificate]: 99 """Parse certificates in the format traefik gives to us""" 100 # https://doc.traefik.io/traefik/reference/routing-configuration/http/middlewares/passtlsclientcert/ 101 ftcc_raw = self.request.headers.get(HEADER_TRAEFIK_FORWARDED) 102 if not ftcc_raw: 103 return [] 104 certs = [] 105 for cert in ftcc_raw.split(","): 106 certs.extend(self.__parse_single_cert(cert, ParseOptions.FORMAT)) 107 return certs 108 109 def _parse_cert_outpost(self) -> list[Certificate]: 110 """Parse certificates in the format outposts give to us. Also authenticates 111 the outpost to ensure it has the permission to do so""" 112 user = ClientIPMiddleware.get_outpost_user(self.request) 113 if not user: 114 return [] 115 if not user.has_perm( 116 "pass_outpost_certificate", self.executor.current_stage 117 ) and not user.has_perm("authentik_stages_mtls.pass_outpost_certificate"): 118 return [] 119 outpost_raw = self.request.headers.get(HEADER_OUTPOST_FORWARDED) 120 return self.__parse_single_cert(outpost_raw, ParseOptions.UNQUOTE) 121 122 def get_authorities(self) -> list[CertificateKeyPair] | None: 123 # We can't access `certificate_authorities` on `self.executor.current_stage`, as that would 124 # load the certificate into the directly referenced foreign key, which we have to pickle 125 # as part of the flow plan, and cryptography certs can't be pickled 126 stage: MutualTLSStage = ( 127 MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk) 128 .prefetch_related("certificate_authorities") 129 .first() 130 ) 131 if stage.certificate_authorities.exists(): 132 return stage.certificate_authorities.order_by("name") 133 brand: Brand = self.request.brand 134 if brand.client_certificates.exists(): 135 return brand.client_certificates.order_by("name") 136 return None 137 138 def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]): 139 authorities_cert = [x.certificate for x in authorities] 140 for _cert in certs: 141 try: 142 PolicyBuilder().store(Store(authorities_cert)).time( 143 now() 144 ).build_client_verifier().verify(_cert, []) 145 return _cert 146 except ( 147 InvalidSignature, 148 TypeError, 149 ValueError, 150 VerificationError, 151 UnsupportedGeneralNameType, 152 ) as exc: 153 self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc) 154 continue 155 return None 156 157 def check_if_user(self, cert: Certificate): 158 stage: MutualTLSStage = self.executor.current_stage 159 cert_attr = None 160 user_attr = None 161 match stage.cert_attribute: 162 case CertAttributes.SUBJECT: 163 cert_attr = cert.subject.rfc4514_string() 164 case CertAttributes.COMMON_NAME: 165 cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME) 166 case CertAttributes.EMAIL: 167 cert_attr = self.get_cert_email(cert) 168 match stage.user_attribute: 169 case UserAttributes.USERNAME: 170 user_attr = "username" 171 case UserAttributes.EMAIL: 172 user_attr = "email" 173 if not user_attr or not cert_attr: 174 return None 175 return User.objects.filter(**{user_attr: cert_attr}).first() 176 177 def _cert_to_dict(self, cert: Certificate) -> dict: 178 """Represent a certificate in a dictionary, as certificate objects cannot be pickled""" 179 return { 180 "serial_number": str(cert.serial_number), 181 "subject": cert.subject.rfc4514_string(), 182 "issuer": cert.issuer.rfc4514_string(), 183 "fingerprint_sha256": hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8"), 184 "fingerprint_sha1": hexlify(cert.fingerprint(hashes.SHA1()), ":").decode( # nosec 185 "utf-8" 186 ), 187 } 188 189 def auth_user(self, user: User, cert: Certificate): 190 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user 191 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls") 192 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 193 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 194 {"certificate": self._cert_to_dict(cert)} 195 ) 196 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 197 198 def enroll_prepare_user(self, cert: Certificate): 199 self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {}) 200 self.executor.plan.context[PLAN_CONTEXT_PROMPT].update( 201 { 202 "email": self.get_cert_email(cert), 203 "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME), 204 } 205 ) 206 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 207 208 def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None: 209 attr = cert.subject.get_attributes_for_oid(oid) 210 if len(attr) < 1: 211 return None 212 return str(attr[0].value) 213 214 def get_cert_email(self, cert: Certificate) -> str | None: 215 ext = cert.extensions.get_extension_for_class(SubjectAlternativeName) 216 _cert_attr = ext.value.get_values_for_type(RFC822Name) 217 if len(_cert_attr) < 1: 218 return None 219 return str(_cert_attr[0]) 220 221 def get_cert(self, mode: StageMode): 222 certs = [ 223 *self._parse_cert_xfcc(), 224 *self._parse_cert_nginx(), 225 *self._parse_cert_traefik(), 226 *self._parse_cert_outpost(), 227 ] 228 authorities = self.get_authorities() 229 if not authorities: 230 self.logger.warning("No Certificate authority found") 231 if mode == StageMode.OPTIONAL: 232 return None 233 if mode == StageMode.REQUIRED: 234 raise PermissionDenied("Unknown error") 235 cert = self.validate_cert(authorities, certs) 236 if not cert and mode == StageMode.REQUIRED: 237 self.logger.warning("Client certificate required but no certificates given") 238 raise PermissionDenied(str(_("Certificate required but no certificate was given."))) 239 if not cert and mode == StageMode.OPTIONAL: 240 self.logger.info("No certificate given, continuing") 241 return None 242 return cert 243 244 def dispatch(self, request, *args, **kwargs): 245 stage: MutualTLSStage = self.executor.current_stage 246 try: 247 cert = self.get_cert(stage.mode) 248 except PermissionDenied as exc: 249 return self.executor.stage_invalid(error_message=exc.detail) 250 if not cert: 251 return self.executor.stage_ok() 252 self.logger.debug("Received certificate", cert=fingerprint_sha256(cert)) 253 existing_user = self.check_if_user(cert) 254 if self.executor.flow.designation == FlowDesignation.ENROLLMENT: 255 self.enroll_prepare_user(cert) 256 elif existing_user: 257 self.auth_user(existing_user, cert) 258 else: 259 return self.executor.stage_invalid(_("No user found for certificate.")) 260 return self.executor.stage_ok()
HEADER_PROXY_FORWARDED =
'X-Forwarded-Client-Cert'
HEADER_NGINX_FORWARDED =
'SSL-Client-Cert'
HEADER_TRAEFIK_FORWARDED =
'X-Forwarded-TLS-Client-Cert'
HEADER_OUTPOST_FORWARDED =
'X-Authentik-Outpost-Certificate'
PLAN_CONTEXT_CERTIFICATE =
'certificate'
class
ParseOptions(enum.IntFlag):
49class ParseOptions(IntFlag): 50 51 # URL unquote the string 52 UNQUOTE = auto() 53 # Re-add PEM Header & footer, and chunk it into 64 character lines 54 FORMAT = auto()
Support for integer-based Flags
UNQUOTE =
<ParseOptions.UNQUOTE: 1>
FORMAT =
<ParseOptions.FORMAT: 2>
57class MTLSStageView(ChallengeStageView): 58 59 def __parse_single_cert(self, raw: str | None, *options: ParseOptions) -> list[Certificate]: 60 """Helper to parse a single certificate""" 61 if not raw: 62 return [] 63 for opt in options: 64 match opt: 65 case ParseOptions.FORMAT: 66 raw = format_cert(raw) 67 case ParseOptions.UNQUOTE: 68 raw = unquote_plus(raw) 69 try: 70 cert = load_pem_x509_certificate(raw.encode()) 71 return [cert] 72 except ValueError as exc: 73 self.logger.info("Failed to parse certificate", exc=exc) 74 return [] 75 76 def _parse_cert_xfcc(self) -> list[Certificate]: 77 """Parse certificates in the format given to us in 78 the format of the authentik router/envoy""" 79 # https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#x-forwarded-client-cert 80 xfcc_raw = self.request.headers.get(HEADER_PROXY_FORWARDED) 81 if not xfcc_raw: 82 return [] 83 certs = [] 84 for r_cert in xfcc_raw.split(","): 85 el = r_cert.split(";") 86 raw_cert = {k.split("=")[0]: k.split("=")[1] for k in el} 87 if "Cert" not in raw_cert: 88 continue 89 certs.extend(self.__parse_single_cert(raw_cert["Cert"], ParseOptions.UNQUOTE)) 90 return certs 91 92 def _parse_cert_nginx(self) -> list[Certificate]: 93 """Parse certificates in the format nginx-ingress gives to us""" 94 # https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#client-certificate-authentication 95 # https://github.com/kubernetes/ingress-nginx/blob/78f593b24494a0674b362faf551079f06d71b5a9/rootfs/etc/nginx/template/nginx.tmpl#L1096 96 sslcc_raw = self.request.headers.get(HEADER_NGINX_FORWARDED) 97 return self.__parse_single_cert(sslcc_raw, ParseOptions.UNQUOTE) 98 99 def _parse_cert_traefik(self) -> list[Certificate]: 100 """Parse certificates in the format traefik gives to us""" 101 # https://doc.traefik.io/traefik/reference/routing-configuration/http/middlewares/passtlsclientcert/ 102 ftcc_raw = self.request.headers.get(HEADER_TRAEFIK_FORWARDED) 103 if not ftcc_raw: 104 return [] 105 certs = [] 106 for cert in ftcc_raw.split(","): 107 certs.extend(self.__parse_single_cert(cert, ParseOptions.FORMAT)) 108 return certs 109 110 def _parse_cert_outpost(self) -> list[Certificate]: 111 """Parse certificates in the format outposts give to us. Also authenticates 112 the outpost to ensure it has the permission to do so""" 113 user = ClientIPMiddleware.get_outpost_user(self.request) 114 if not user: 115 return [] 116 if not user.has_perm( 117 "pass_outpost_certificate", self.executor.current_stage 118 ) and not user.has_perm("authentik_stages_mtls.pass_outpost_certificate"): 119 return [] 120 outpost_raw = self.request.headers.get(HEADER_OUTPOST_FORWARDED) 121 return self.__parse_single_cert(outpost_raw, ParseOptions.UNQUOTE) 122 123 def get_authorities(self) -> list[CertificateKeyPair] | None: 124 # We can't access `certificate_authorities` on `self.executor.current_stage`, as that would 125 # load the certificate into the directly referenced foreign key, which we have to pickle 126 # as part of the flow plan, and cryptography certs can't be pickled 127 stage: MutualTLSStage = ( 128 MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk) 129 .prefetch_related("certificate_authorities") 130 .first() 131 ) 132 if stage.certificate_authorities.exists(): 133 return stage.certificate_authorities.order_by("name") 134 brand: Brand = self.request.brand 135 if brand.client_certificates.exists(): 136 return brand.client_certificates.order_by("name") 137 return None 138 139 def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]): 140 authorities_cert = [x.certificate for x in authorities] 141 for _cert in certs: 142 try: 143 PolicyBuilder().store(Store(authorities_cert)).time( 144 now() 145 ).build_client_verifier().verify(_cert, []) 146 return _cert 147 except ( 148 InvalidSignature, 149 TypeError, 150 ValueError, 151 VerificationError, 152 UnsupportedGeneralNameType, 153 ) as exc: 154 self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc) 155 continue 156 return None 157 158 def check_if_user(self, cert: Certificate): 159 stage: MutualTLSStage = self.executor.current_stage 160 cert_attr = None 161 user_attr = None 162 match stage.cert_attribute: 163 case CertAttributes.SUBJECT: 164 cert_attr = cert.subject.rfc4514_string() 165 case CertAttributes.COMMON_NAME: 166 cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME) 167 case CertAttributes.EMAIL: 168 cert_attr = self.get_cert_email(cert) 169 match stage.user_attribute: 170 case UserAttributes.USERNAME: 171 user_attr = "username" 172 case UserAttributes.EMAIL: 173 user_attr = "email" 174 if not user_attr or not cert_attr: 175 return None 176 return User.objects.filter(**{user_attr: cert_attr}).first() 177 178 def _cert_to_dict(self, cert: Certificate) -> dict: 179 """Represent a certificate in a dictionary, as certificate objects cannot be pickled""" 180 return { 181 "serial_number": str(cert.serial_number), 182 "subject": cert.subject.rfc4514_string(), 183 "issuer": cert.issuer.rfc4514_string(), 184 "fingerprint_sha256": hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8"), 185 "fingerprint_sha1": hexlify(cert.fingerprint(hashes.SHA1()), ":").decode( # nosec 186 "utf-8" 187 ), 188 } 189 190 def auth_user(self, user: User, cert: Certificate): 191 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user 192 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls") 193 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 194 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 195 {"certificate": self._cert_to_dict(cert)} 196 ) 197 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 198 199 def enroll_prepare_user(self, cert: Certificate): 200 self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {}) 201 self.executor.plan.context[PLAN_CONTEXT_PROMPT].update( 202 { 203 "email": self.get_cert_email(cert), 204 "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME), 205 } 206 ) 207 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 208 209 def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None: 210 attr = cert.subject.get_attributes_for_oid(oid) 211 if len(attr) < 1: 212 return None 213 return str(attr[0].value) 214 215 def get_cert_email(self, cert: Certificate) -> str | None: 216 ext = cert.extensions.get_extension_for_class(SubjectAlternativeName) 217 _cert_attr = ext.value.get_values_for_type(RFC822Name) 218 if len(_cert_attr) < 1: 219 return None 220 return str(_cert_attr[0]) 221 222 def get_cert(self, mode: StageMode): 223 certs = [ 224 *self._parse_cert_xfcc(), 225 *self._parse_cert_nginx(), 226 *self._parse_cert_traefik(), 227 *self._parse_cert_outpost(), 228 ] 229 authorities = self.get_authorities() 230 if not authorities: 231 self.logger.warning("No Certificate authority found") 232 if mode == StageMode.OPTIONAL: 233 return None 234 if mode == StageMode.REQUIRED: 235 raise PermissionDenied("Unknown error") 236 cert = self.validate_cert(authorities, certs) 237 if not cert and mode == StageMode.REQUIRED: 238 self.logger.warning("Client certificate required but no certificates given") 239 raise PermissionDenied(str(_("Certificate required but no certificate was given."))) 240 if not cert and mode == StageMode.OPTIONAL: 241 self.logger.info("No certificate given, continuing") 242 return None 243 return cert 244 245 def dispatch(self, request, *args, **kwargs): 246 stage: MutualTLSStage = self.executor.current_stage 247 try: 248 cert = self.get_cert(stage.mode) 249 except PermissionDenied as exc: 250 return self.executor.stage_invalid(error_message=exc.detail) 251 if not cert: 252 return self.executor.stage_ok() 253 self.logger.debug("Received certificate", cert=fingerprint_sha256(cert)) 254 existing_user = self.check_if_user(cert) 255 if self.executor.flow.designation == FlowDesignation.ENROLLMENT: 256 self.enroll_prepare_user(cert) 257 elif existing_user: 258 self.auth_user(existing_user, cert) 259 else: 260 return self.executor.stage_invalid(_("No user found for certificate.")) 261 return self.executor.stage_ok()
Stage view which response with a challenge
def
validate_cert( self, authorities: list[authentik.crypto.models.CertificateKeyPair], certs: list[cryptography.hazmat.bindings._rust.x509.Certificate]):
139 def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]): 140 authorities_cert = [x.certificate for x in authorities] 141 for _cert in certs: 142 try: 143 PolicyBuilder().store(Store(authorities_cert)).time( 144 now() 145 ).build_client_verifier().verify(_cert, []) 146 return _cert 147 except ( 148 InvalidSignature, 149 TypeError, 150 ValueError, 151 VerificationError, 152 UnsupportedGeneralNameType, 153 ) as exc: 154 self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc) 155 continue 156 return None
def
check_if_user(self, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
158 def check_if_user(self, cert: Certificate): 159 stage: MutualTLSStage = self.executor.current_stage 160 cert_attr = None 161 user_attr = None 162 match stage.cert_attribute: 163 case CertAttributes.SUBJECT: 164 cert_attr = cert.subject.rfc4514_string() 165 case CertAttributes.COMMON_NAME: 166 cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME) 167 case CertAttributes.EMAIL: 168 cert_attr = self.get_cert_email(cert) 169 match stage.user_attribute: 170 case UserAttributes.USERNAME: 171 user_attr = "username" 172 case UserAttributes.EMAIL: 173 user_attr = "email" 174 if not user_attr or not cert_attr: 175 return None 176 return User.objects.filter(**{user_attr: cert_attr}).first()
def
auth_user( self, user: authentik.core.models.User, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
190 def auth_user(self, user: User, cert: Certificate): 191 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user 192 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls") 193 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 194 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 195 {"certificate": self._cert_to_dict(cert)} 196 ) 197 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
def
enroll_prepare_user(self, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
199 def enroll_prepare_user(self, cert: Certificate): 200 self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {}) 201 self.executor.plan.context[PLAN_CONTEXT_PROMPT].update( 202 { 203 "email": self.get_cert_email(cert), 204 "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME), 205 } 206 ) 207 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
def
get_cert_attribute( self, cert: cryptography.hazmat.bindings._rust.x509.Certificate, oid: cryptography.hazmat.bindings._rust.ObjectIdentifier) -> str | None:
222 def get_cert(self, mode: StageMode): 223 certs = [ 224 *self._parse_cert_xfcc(), 225 *self._parse_cert_nginx(), 226 *self._parse_cert_traefik(), 227 *self._parse_cert_outpost(), 228 ] 229 authorities = self.get_authorities() 230 if not authorities: 231 self.logger.warning("No Certificate authority found") 232 if mode == StageMode.OPTIONAL: 233 return None 234 if mode == StageMode.REQUIRED: 235 raise PermissionDenied("Unknown error") 236 cert = self.validate_cert(authorities, certs) 237 if not cert and mode == StageMode.REQUIRED: 238 self.logger.warning("Client certificate required but no certificates given") 239 raise PermissionDenied(str(_("Certificate required but no certificate was given."))) 240 if not cert and mode == StageMode.OPTIONAL: 241 self.logger.info("No certificate given, continuing") 242 return None 243 return cert
def
dispatch(self, request, *args, **kwargs):
245 def dispatch(self, request, *args, **kwargs): 246 stage: MutualTLSStage = self.executor.current_stage 247 try: 248 cert = self.get_cert(stage.mode) 249 except PermissionDenied as exc: 250 return self.executor.stage_invalid(error_message=exc.detail) 251 if not cert: 252 return self.executor.stage_ok() 253 self.logger.debug("Received certificate", cert=fingerprint_sha256(cert)) 254 existing_user = self.check_if_user(cert) 255 if self.executor.flow.designation == FlowDesignation.ENROLLMENT: 256 self.enroll_prepare_user(cert) 257 elif existing_user: 258 self.auth_user(existing_user, cert) 259 else: 260 return self.executor.stage_invalid(_("No user found for certificate.")) 261 return self.executor.stage_ok()