authentik.enterprise.stages.mtls.stage
1from binascii import hexlify 2from enum import IntFlag, auto 3from urllib.parse import unquote_plus 4 5from cryptography.exceptions import InvalidSignature 6from cryptography.hazmat.primitives import hashes 7from cryptography.x509 import ( 8 Certificate, 9 NameOID, 10 ObjectIdentifier, 11 RFC822Name, 12 SubjectAlternativeName, 13 UnsupportedGeneralNameType, 14 load_pem_x509_certificate, 15) 16from cryptography.x509.verification import PolicyBuilder, Store, VerificationError 17from django.utils.translation import gettext_lazy as _ 18 19from authentik.brands.models import Brand 20from authentik.core.models import User 21from authentik.crypto.models import CertificateKeyPair, fingerprint_sha256, format_cert 22from authentik.endpoints.models import StageMode 23from authentik.enterprise.stages.mtls.models import ( 24 CertAttributes, 25 MutualTLSStage, 26 UserAttributes, 27) 28from authentik.flows.challenge import AccessDeniedChallenge 29from authentik.flows.models import FlowDesignation 30from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER 31from authentik.flows.stage import ChallengeStageView 32from authentik.root.middleware import ClientIPMiddleware 33from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS 34from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 35 36# All of these headers must only be accepted from "trusted" reverse proxies 37# See internal/web/proxy.go:39 38HEADER_PROXY_FORWARDED = "X-Forwarded-Client-Cert" 39HEADER_NGINX_FORWARDED = "SSL-Client-Cert" 40HEADER_TRAEFIK_FORWARDED = "X-Forwarded-TLS-Client-Cert" 41HEADER_OUTPOST_FORWARDED = "X-Authentik-Outpost-Certificate" 42 43 44PLAN_CONTEXT_CERTIFICATE = "certificate" 45 46 47class ParseOptions(IntFlag): 48 49 # URL unquote the string 50 UNQUOTE = auto() 51 # Re-add PEM Header & footer, and chunk it into 64 character lines 52 FORMAT = auto() 53 54 55class MTLSStageView(ChallengeStageView): 56 57 def __parse_single_cert(self, raw: str | None, *options: ParseOptions) -> list[Certificate]: 58 """Helper to parse a single certificate""" 59 if not raw: 60 return [] 61 for opt in options: 62 match opt: 63 case ParseOptions.FORMAT: 64 raw = format_cert(raw) 65 case ParseOptions.UNQUOTE: 66 raw = unquote_plus(raw) 67 try: 68 cert = load_pem_x509_certificate(raw.encode()) 69 return [cert] 70 except ValueError as exc: 71 self.logger.info("Failed to parse certificate", exc=exc) 72 return [] 73 74 def _parse_cert_xfcc(self) -> list[Certificate]: 75 """Parse certificates in the format given to us in 76 the format of the authentik router/envoy""" 77 # https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#x-forwarded-client-cert 78 xfcc_raw = self.request.headers.get(HEADER_PROXY_FORWARDED) 79 if not xfcc_raw: 80 return [] 81 certs = [] 82 for r_cert in xfcc_raw.split(","): 83 el = r_cert.split(";") 84 raw_cert = {k.split("=")[0]: k.split("=")[1] for k in el} 85 if "Cert" not in raw_cert: 86 continue 87 certs.extend(self.__parse_single_cert(raw_cert["Cert"], ParseOptions.UNQUOTE)) 88 return certs 89 90 def _parse_cert_nginx(self) -> list[Certificate]: 91 """Parse certificates in the format nginx-ingress gives to us""" 92 # https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#client-certificate-authentication 93 # https://github.com/kubernetes/ingress-nginx/blob/78f593b24494a0674b362faf551079f06d71b5a9/rootfs/etc/nginx/template/nginx.tmpl#L1096 94 sslcc_raw = self.request.headers.get(HEADER_NGINX_FORWARDED) 95 return self.__parse_single_cert(sslcc_raw, ParseOptions.UNQUOTE) 96 97 def _parse_cert_traefik(self) -> list[Certificate]: 98 """Parse certificates in the format traefik gives to us""" 99 # https://doc.traefik.io/traefik/reference/routing-configuration/http/middlewares/passtlsclientcert/ 100 ftcc_raw = self.request.headers.get(HEADER_TRAEFIK_FORWARDED) 101 if not ftcc_raw: 102 return [] 103 certs = [] 104 for cert in ftcc_raw.split(","): 105 certs.extend(self.__parse_single_cert(cert, ParseOptions.UNQUOTE, ParseOptions.FORMAT)) 106 return certs 107 108 def _parse_cert_outpost(self) -> list[Certificate]: 109 """Parse certificates in the format outposts give to us. Also authenticates 110 the outpost to ensure it has the permission to do so""" 111 user = ClientIPMiddleware.get_outpost_user(self.request) 112 if not user: 113 return [] 114 if not user.has_perm( 115 "pass_outpost_certificate", self.executor.current_stage 116 ) and not user.has_perm("authentik_stages_mtls.pass_outpost_certificate"): 117 return [] 118 outpost_raw = self.request.headers.get(HEADER_OUTPOST_FORWARDED) 119 return self.__parse_single_cert(outpost_raw, ParseOptions.UNQUOTE) 120 121 def get_authorities(self) -> list[CertificateKeyPair] | None: 122 # We can't access `certificate_authorities` on `self.executor.current_stage`, as that would 123 # load the certificate into the directly referenced foreign key, which we have to pickle 124 # as part of the flow plan, and cryptography certs can't be pickled 125 stage: MutualTLSStage = ( 126 MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk) 127 .prefetch_related("certificate_authorities") 128 .first() 129 ) 130 if stage.certificate_authorities.exists(): 131 return stage.certificate_authorities.order_by("name") 132 brand: Brand = self.request.brand 133 if brand.client_certificates.exists(): 134 return brand.client_certificates.order_by("name") 135 return None 136 137 def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]): 138 authorities_cert = [x.certificate for x in authorities] 139 for _cert in certs: 140 try: 141 PolicyBuilder().store(Store(authorities_cert)).build_client_verifier().verify( 142 _cert, [] 143 ) 144 return _cert 145 except ( 146 InvalidSignature, 147 TypeError, 148 ValueError, 149 VerificationError, 150 UnsupportedGeneralNameType, 151 ) as exc: 152 self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc) 153 continue 154 return None 155 156 def check_if_user(self, cert: Certificate): 157 stage: MutualTLSStage = self.executor.current_stage 158 cert_attr = None 159 user_attr = None 160 match stage.cert_attribute: 161 case CertAttributes.SUBJECT: 162 cert_attr = cert.subject.rfc4514_string() 163 case CertAttributes.COMMON_NAME: 164 cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME) 165 case CertAttributes.EMAIL: 166 cert_attr = self.get_cert_email(cert) 167 match stage.user_attribute: 168 case UserAttributes.USERNAME: 169 user_attr = "username" 170 case UserAttributes.EMAIL: 171 user_attr = "email" 172 if not user_attr or not cert_attr: 173 return None 174 return User.objects.filter(**{user_attr: cert_attr}).first() 175 176 def _cert_to_dict(self, cert: Certificate) -> dict: 177 """Represent a certificate in a dictionary, as certificate objects cannot be pickled""" 178 return { 179 "serial_number": str(cert.serial_number), 180 "subject": cert.subject.rfc4514_string(), 181 "issuer": cert.issuer.rfc4514_string(), 182 "fingerprint_sha256": hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8"), 183 "fingerprint_sha1": hexlify(cert.fingerprint(hashes.SHA1()), ":").decode( # nosec 184 "utf-8" 185 ), 186 } 187 188 def auth_user(self, user: User, cert: Certificate): 189 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user 190 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls") 191 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 192 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 193 {"certificate": self._cert_to_dict(cert)} 194 ) 195 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 196 197 def enroll_prepare_user(self, cert: Certificate): 198 self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {}) 199 self.executor.plan.context[PLAN_CONTEXT_PROMPT].update( 200 { 201 "email": self.get_cert_email(cert), 202 "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME), 203 } 204 ) 205 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 206 207 def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None: 208 attr = cert.subject.get_attributes_for_oid(oid) 209 if len(attr) < 1: 210 return None 211 return str(attr[0].value) 212 213 def get_cert_email(self, cert: Certificate) -> str | None: 214 ext = cert.extensions.get_extension_for_class(SubjectAlternativeName) 215 _cert_attr = ext.value.get_values_for_type(RFC822Name) 216 if len(_cert_attr) < 1: 217 return None 218 return str(_cert_attr[0]) 219 220 def dispatch(self, request, *args, **kwargs): 221 stage: MutualTLSStage = self.executor.current_stage 222 certs = [ 223 *self._parse_cert_xfcc(), 224 *self._parse_cert_nginx(), 225 *self._parse_cert_traefik(), 226 *self._parse_cert_outpost(), 227 ] 228 authorities = self.get_authorities() 229 if not authorities: 230 self.logger.warning("No Certificate authority found") 231 if stage.mode == StageMode.OPTIONAL: 232 return self.executor.stage_ok() 233 if stage.mode == StageMode.REQUIRED: 234 return super().dispatch(request, *args, **kwargs) 235 cert = self.validate_cert(authorities, certs) 236 if not cert and stage.mode == StageMode.REQUIRED: 237 self.logger.warning("Client certificate required but no certificates given") 238 return super().dispatch( 239 request, 240 *args, 241 error_message=_("Certificate required but no certificate was given."), 242 **kwargs, 243 ) 244 if not cert and stage.mode == StageMode.OPTIONAL: 245 self.logger.info("No certificate given, continuing") 246 return self.executor.stage_ok() 247 self.logger.debug("Received certificate", cert=fingerprint_sha256(cert)) 248 existing_user = self.check_if_user(cert) 249 if self.executor.flow.designation == FlowDesignation.ENROLLMENT: 250 self.enroll_prepare_user(cert) 251 elif existing_user: 252 self.auth_user(existing_user, cert) 253 else: 254 return super().dispatch( 255 request, *args, error_message=_("No user found for certificate."), **kwargs 256 ) 257 return self.executor.stage_ok() 258 259 def get_challenge(self, *args, error_message: str | None = None, **kwargs): 260 return AccessDeniedChallenge( 261 data={ 262 "component": "ak-stage-access-denied", 263 "error_message": str(error_message or "Unknown error"), 264 } 265 )
HEADER_PROXY_FORWARDED =
'X-Forwarded-Client-Cert'
HEADER_NGINX_FORWARDED =
'SSL-Client-Cert'
HEADER_TRAEFIK_FORWARDED =
'X-Forwarded-TLS-Client-Cert'
HEADER_OUTPOST_FORWARDED =
'X-Authentik-Outpost-Certificate'
PLAN_CONTEXT_CERTIFICATE =
'certificate'
class
ParseOptions(enum.IntFlag):
48class ParseOptions(IntFlag): 49 50 # URL unquote the string 51 UNQUOTE = auto() 52 # Re-add PEM Header & footer, and chunk it into 64 character lines 53 FORMAT = auto()
Support for integer-based Flags
UNQUOTE =
<ParseOptions.UNQUOTE: 1>
FORMAT =
<ParseOptions.FORMAT: 2>
56class MTLSStageView(ChallengeStageView): 57 58 def __parse_single_cert(self, raw: str | None, *options: ParseOptions) -> list[Certificate]: 59 """Helper to parse a single certificate""" 60 if not raw: 61 return [] 62 for opt in options: 63 match opt: 64 case ParseOptions.FORMAT: 65 raw = format_cert(raw) 66 case ParseOptions.UNQUOTE: 67 raw = unquote_plus(raw) 68 try: 69 cert = load_pem_x509_certificate(raw.encode()) 70 return [cert] 71 except ValueError as exc: 72 self.logger.info("Failed to parse certificate", exc=exc) 73 return [] 74 75 def _parse_cert_xfcc(self) -> list[Certificate]: 76 """Parse certificates in the format given to us in 77 the format of the authentik router/envoy""" 78 # https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_conn_man/headers#x-forwarded-client-cert 79 xfcc_raw = self.request.headers.get(HEADER_PROXY_FORWARDED) 80 if not xfcc_raw: 81 return [] 82 certs = [] 83 for r_cert in xfcc_raw.split(","): 84 el = r_cert.split(";") 85 raw_cert = {k.split("=")[0]: k.split("=")[1] for k in el} 86 if "Cert" not in raw_cert: 87 continue 88 certs.extend(self.__parse_single_cert(raw_cert["Cert"], ParseOptions.UNQUOTE)) 89 return certs 90 91 def _parse_cert_nginx(self) -> list[Certificate]: 92 """Parse certificates in the format nginx-ingress gives to us""" 93 # https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#client-certificate-authentication 94 # https://github.com/kubernetes/ingress-nginx/blob/78f593b24494a0674b362faf551079f06d71b5a9/rootfs/etc/nginx/template/nginx.tmpl#L1096 95 sslcc_raw = self.request.headers.get(HEADER_NGINX_FORWARDED) 96 return self.__parse_single_cert(sslcc_raw, ParseOptions.UNQUOTE) 97 98 def _parse_cert_traefik(self) -> list[Certificate]: 99 """Parse certificates in the format traefik gives to us""" 100 # https://doc.traefik.io/traefik/reference/routing-configuration/http/middlewares/passtlsclientcert/ 101 ftcc_raw = self.request.headers.get(HEADER_TRAEFIK_FORWARDED) 102 if not ftcc_raw: 103 return [] 104 certs = [] 105 for cert in ftcc_raw.split(","): 106 certs.extend(self.__parse_single_cert(cert, ParseOptions.UNQUOTE, ParseOptions.FORMAT)) 107 return certs 108 109 def _parse_cert_outpost(self) -> list[Certificate]: 110 """Parse certificates in the format outposts give to us. Also authenticates 111 the outpost to ensure it has the permission to do so""" 112 user = ClientIPMiddleware.get_outpost_user(self.request) 113 if not user: 114 return [] 115 if not user.has_perm( 116 "pass_outpost_certificate", self.executor.current_stage 117 ) and not user.has_perm("authentik_stages_mtls.pass_outpost_certificate"): 118 return [] 119 outpost_raw = self.request.headers.get(HEADER_OUTPOST_FORWARDED) 120 return self.__parse_single_cert(outpost_raw, ParseOptions.UNQUOTE) 121 122 def get_authorities(self) -> list[CertificateKeyPair] | None: 123 # We can't access `certificate_authorities` on `self.executor.current_stage`, as that would 124 # load the certificate into the directly referenced foreign key, which we have to pickle 125 # as part of the flow plan, and cryptography certs can't be pickled 126 stage: MutualTLSStage = ( 127 MutualTLSStage.objects.filter(pk=self.executor.current_stage.pk) 128 .prefetch_related("certificate_authorities") 129 .first() 130 ) 131 if stage.certificate_authorities.exists(): 132 return stage.certificate_authorities.order_by("name") 133 brand: Brand = self.request.brand 134 if brand.client_certificates.exists(): 135 return brand.client_certificates.order_by("name") 136 return None 137 138 def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]): 139 authorities_cert = [x.certificate for x in authorities] 140 for _cert in certs: 141 try: 142 PolicyBuilder().store(Store(authorities_cert)).build_client_verifier().verify( 143 _cert, [] 144 ) 145 return _cert 146 except ( 147 InvalidSignature, 148 TypeError, 149 ValueError, 150 VerificationError, 151 UnsupportedGeneralNameType, 152 ) as exc: 153 self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc) 154 continue 155 return None 156 157 def check_if_user(self, cert: Certificate): 158 stage: MutualTLSStage = self.executor.current_stage 159 cert_attr = None 160 user_attr = None 161 match stage.cert_attribute: 162 case CertAttributes.SUBJECT: 163 cert_attr = cert.subject.rfc4514_string() 164 case CertAttributes.COMMON_NAME: 165 cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME) 166 case CertAttributes.EMAIL: 167 cert_attr = self.get_cert_email(cert) 168 match stage.user_attribute: 169 case UserAttributes.USERNAME: 170 user_attr = "username" 171 case UserAttributes.EMAIL: 172 user_attr = "email" 173 if not user_attr or not cert_attr: 174 return None 175 return User.objects.filter(**{user_attr: cert_attr}).first() 176 177 def _cert_to_dict(self, cert: Certificate) -> dict: 178 """Represent a certificate in a dictionary, as certificate objects cannot be pickled""" 179 return { 180 "serial_number": str(cert.serial_number), 181 "subject": cert.subject.rfc4514_string(), 182 "issuer": cert.issuer.rfc4514_string(), 183 "fingerprint_sha256": hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8"), 184 "fingerprint_sha1": hexlify(cert.fingerprint(hashes.SHA1()), ":").decode( # nosec 185 "utf-8" 186 ), 187 } 188 189 def auth_user(self, user: User, cert: Certificate): 190 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user 191 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls") 192 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 193 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 194 {"certificate": self._cert_to_dict(cert)} 195 ) 196 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 197 198 def enroll_prepare_user(self, cert: Certificate): 199 self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {}) 200 self.executor.plan.context[PLAN_CONTEXT_PROMPT].update( 201 { 202 "email": self.get_cert_email(cert), 203 "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME), 204 } 205 ) 206 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 207 208 def get_cert_attribute(self, cert: Certificate, oid: ObjectIdentifier) -> str | None: 209 attr = cert.subject.get_attributes_for_oid(oid) 210 if len(attr) < 1: 211 return None 212 return str(attr[0].value) 213 214 def get_cert_email(self, cert: Certificate) -> str | None: 215 ext = cert.extensions.get_extension_for_class(SubjectAlternativeName) 216 _cert_attr = ext.value.get_values_for_type(RFC822Name) 217 if len(_cert_attr) < 1: 218 return None 219 return str(_cert_attr[0]) 220 221 def dispatch(self, request, *args, **kwargs): 222 stage: MutualTLSStage = self.executor.current_stage 223 certs = [ 224 *self._parse_cert_xfcc(), 225 *self._parse_cert_nginx(), 226 *self._parse_cert_traefik(), 227 *self._parse_cert_outpost(), 228 ] 229 authorities = self.get_authorities() 230 if not authorities: 231 self.logger.warning("No Certificate authority found") 232 if stage.mode == StageMode.OPTIONAL: 233 return self.executor.stage_ok() 234 if stage.mode == StageMode.REQUIRED: 235 return super().dispatch(request, *args, **kwargs) 236 cert = self.validate_cert(authorities, certs) 237 if not cert and stage.mode == StageMode.REQUIRED: 238 self.logger.warning("Client certificate required but no certificates given") 239 return super().dispatch( 240 request, 241 *args, 242 error_message=_("Certificate required but no certificate was given."), 243 **kwargs, 244 ) 245 if not cert and stage.mode == StageMode.OPTIONAL: 246 self.logger.info("No certificate given, continuing") 247 return self.executor.stage_ok() 248 self.logger.debug("Received certificate", cert=fingerprint_sha256(cert)) 249 existing_user = self.check_if_user(cert) 250 if self.executor.flow.designation == FlowDesignation.ENROLLMENT: 251 self.enroll_prepare_user(cert) 252 elif existing_user: 253 self.auth_user(existing_user, cert) 254 else: 255 return super().dispatch( 256 request, *args, error_message=_("No user found for certificate."), **kwargs 257 ) 258 return self.executor.stage_ok() 259 260 def get_challenge(self, *args, error_message: str | None = None, **kwargs): 261 return AccessDeniedChallenge( 262 data={ 263 "component": "ak-stage-access-denied", 264 "error_message": str(error_message or "Unknown error"), 265 } 266 )
Stage view which response with a challenge
def
validate_cert( self, authorities: list[authentik.crypto.models.CertificateKeyPair], certs: list[cryptography.hazmat.bindings._rust.x509.Certificate]):
138 def validate_cert(self, authorities: list[CertificateKeyPair], certs: list[Certificate]): 139 authorities_cert = [x.certificate for x in authorities] 140 for _cert in certs: 141 try: 142 PolicyBuilder().store(Store(authorities_cert)).build_client_verifier().verify( 143 _cert, [] 144 ) 145 return _cert 146 except ( 147 InvalidSignature, 148 TypeError, 149 ValueError, 150 VerificationError, 151 UnsupportedGeneralNameType, 152 ) as exc: 153 self.logger.warning("Discarding invalid certificate", cert=_cert, exc=exc) 154 continue 155 return None
def
check_if_user(self, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
157 def check_if_user(self, cert: Certificate): 158 stage: MutualTLSStage = self.executor.current_stage 159 cert_attr = None 160 user_attr = None 161 match stage.cert_attribute: 162 case CertAttributes.SUBJECT: 163 cert_attr = cert.subject.rfc4514_string() 164 case CertAttributes.COMMON_NAME: 165 cert_attr = self.get_cert_attribute(cert, NameOID.COMMON_NAME) 166 case CertAttributes.EMAIL: 167 cert_attr = self.get_cert_email(cert) 168 match stage.user_attribute: 169 case UserAttributes.USERNAME: 170 user_attr = "username" 171 case UserAttributes.EMAIL: 172 user_attr = "email" 173 if not user_attr or not cert_attr: 174 return None 175 return User.objects.filter(**{user_attr: cert_attr}).first()
def
auth_user( self, user: authentik.core.models.User, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
189 def auth_user(self, user: User, cert: Certificate): 190 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user 191 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "mtls") 192 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 193 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 194 {"certificate": self._cert_to_dict(cert)} 195 ) 196 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
def
enroll_prepare_user(self, cert: cryptography.hazmat.bindings._rust.x509.Certificate):
198 def enroll_prepare_user(self, cert: Certificate): 199 self.executor.plan.context.setdefault(PLAN_CONTEXT_PROMPT, {}) 200 self.executor.plan.context[PLAN_CONTEXT_PROMPT].update( 201 { 202 "email": self.get_cert_email(cert), 203 "name": self.get_cert_attribute(cert, NameOID.COMMON_NAME), 204 } 205 ) 206 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
def
get_cert_attribute( self, cert: cryptography.hazmat.bindings._rust.x509.Certificate, oid: cryptography.hazmat.bindings._rust.ObjectIdentifier) -> str | None:
def
dispatch(self, request, *args, **kwargs):
221 def dispatch(self, request, *args, **kwargs): 222 stage: MutualTLSStage = self.executor.current_stage 223 certs = [ 224 *self._parse_cert_xfcc(), 225 *self._parse_cert_nginx(), 226 *self._parse_cert_traefik(), 227 *self._parse_cert_outpost(), 228 ] 229 authorities = self.get_authorities() 230 if not authorities: 231 self.logger.warning("No Certificate authority found") 232 if stage.mode == StageMode.OPTIONAL: 233 return self.executor.stage_ok() 234 if stage.mode == StageMode.REQUIRED: 235 return super().dispatch(request, *args, **kwargs) 236 cert = self.validate_cert(authorities, certs) 237 if not cert and stage.mode == StageMode.REQUIRED: 238 self.logger.warning("Client certificate required but no certificates given") 239 return super().dispatch( 240 request, 241 *args, 242 error_message=_("Certificate required but no certificate was given."), 243 **kwargs, 244 ) 245 if not cert and stage.mode == StageMode.OPTIONAL: 246 self.logger.info("No certificate given, continuing") 247 return self.executor.stage_ok() 248 self.logger.debug("Received certificate", cert=fingerprint_sha256(cert)) 249 existing_user = self.check_if_user(cert) 250 if self.executor.flow.designation == FlowDesignation.ENROLLMENT: 251 self.enroll_prepare_user(cert) 252 elif existing_user: 253 self.auth_user(existing_user, cert) 254 else: 255 return super().dispatch( 256 request, *args, error_message=_("No user found for certificate."), **kwargs 257 ) 258 return self.executor.stage_ok()
def
get_challenge(self, *args, error_message: str | None = None, **kwargs):
260 def get_challenge(self, *args, error_message: str | None = None, **kwargs): 261 return AccessDeniedChallenge( 262 data={ 263 "component": "ak-stage-access-denied", 264 "error_message": str(error_message or "Unknown error"), 265 } 266 )
Return the challenge that the client should solve