authentik.crypto.signals
authentik crypto signals
1"""authentik crypto signals""" 2 3from binascii import hexlify 4from datetime import datetime 5from ssl import CertificateError 6 7from cryptography.hazmat.primitives import hashes 8from cryptography.x509 import Certificate 9from django.db.models.signals import pre_save 10from django.dispatch import receiver 11from structlog.stdlib import get_logger 12 13from authentik.crypto.models import ( 14 CertificateKeyPair, 15 detect_key_type, 16 fingerprint_sha256, 17 generate_key_id, 18 generate_key_id_legacy, 19) 20 21LOGGER = get_logger() 22 23 24def extract_certificate_metadata(certificate: Certificate) -> dict[str, str | datetime]: 25 """Extract all metadata fields from a certificate.""" 26 metadata = {} 27 28 try: 29 metadata["key_type"] = detect_key_type(certificate) 30 metadata["cert_expiry"] = certificate.not_valid_after_utc 31 metadata["cert_subject"] = certificate.subject.rfc4514_string() 32 metadata["fingerprint_sha256"] = fingerprint_sha256(certificate) 33 metadata["fingerprint_sha1"] = hexlify( 34 certificate.fingerprint(hashes.SHA1()), ":" # nosec 35 ).decode("utf-8") 36 except (ValueError, TypeError, AttributeError) as exc: 37 raise CertificateError(f"Invalid certificate metadata: {exc}") from exc 38 39 return metadata 40 41 42@receiver(pre_save, sender="authentik_crypto.CertificateKeyPair") 43def certificate_key_pair_pre_save( 44 sender: type[CertificateKeyPair], instance: CertificateKeyPair, **_ 45): 46 """Automatically populate certificate metadata fields before saving""" 47 48 # Only extract metadata if certificate_data is present 49 if not instance.certificate_data: 50 return 51 52 try: 53 metadata = extract_certificate_metadata(instance.certificate) 54 except (CertificateError, ValueError, TypeError, AttributeError) as exc: 55 LOGGER.warning("Failed to extract certificate metadata", exc=exc) 56 return 57 58 instance.key_type = metadata["key_type"] 59 instance.cert_expiry = metadata["cert_expiry"] 60 instance.cert_subject = metadata["cert_subject"] 61 instance.fingerprint_sha256 = metadata["fingerprint_sha256"] 62 instance.fingerprint_sha1 = metadata["fingerprint_sha1"] 63 64 # Generate kid if not set, or regenerate if key_data has changed 65 # Preserve existing kid (MD5 or SHA512) if it matches the current key_data 66 if instance.key_data: 67 new_kid = generate_key_id(instance.key_data) 68 legacy_kid = generate_key_id_legacy(instance.key_data) 69 if instance.kid not in (new_kid, legacy_kid): 70 instance.kid = new_kid
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def
extract_certificate_metadata( certificate: cryptography.hazmat.bindings._rust.x509.Certificate) -> dict[str, str | datetime.datetime]:
25def extract_certificate_metadata(certificate: Certificate) -> dict[str, str | datetime]: 26 """Extract all metadata fields from a certificate.""" 27 metadata = {} 28 29 try: 30 metadata["key_type"] = detect_key_type(certificate) 31 metadata["cert_expiry"] = certificate.not_valid_after_utc 32 metadata["cert_subject"] = certificate.subject.rfc4514_string() 33 metadata["fingerprint_sha256"] = fingerprint_sha256(certificate) 34 metadata["fingerprint_sha1"] = hexlify( 35 certificate.fingerprint(hashes.SHA1()), ":" # nosec 36 ).decode("utf-8") 37 except (ValueError, TypeError, AttributeError) as exc: 38 raise CertificateError(f"Invalid certificate metadata: {exc}") from exc 39 40 return metadata
Extract all metadata fields from a certificate.
@receiver(pre_save, sender='authentik_crypto.CertificateKeyPair')
def
certificate_key_pair_pre_save( sender: type[authentik.crypto.models.CertificateKeyPair], instance: authentik.crypto.models.CertificateKeyPair, **_):
43@receiver(pre_save, sender="authentik_crypto.CertificateKeyPair") 44def certificate_key_pair_pre_save( 45 sender: type[CertificateKeyPair], instance: CertificateKeyPair, **_ 46): 47 """Automatically populate certificate metadata fields before saving""" 48 49 # Only extract metadata if certificate_data is present 50 if not instance.certificate_data: 51 return 52 53 try: 54 metadata = extract_certificate_metadata(instance.certificate) 55 except (CertificateError, ValueError, TypeError, AttributeError) as exc: 56 LOGGER.warning("Failed to extract certificate metadata", exc=exc) 57 return 58 59 instance.key_type = metadata["key_type"] 60 instance.cert_expiry = metadata["cert_expiry"] 61 instance.cert_subject = metadata["cert_subject"] 62 instance.fingerprint_sha256 = metadata["fingerprint_sha256"] 63 instance.fingerprint_sha1 = metadata["fingerprint_sha1"] 64 65 # Generate kid if not set, or regenerate if key_data has changed 66 # Preserve existing kid (MD5 or SHA512) if it matches the current key_data 67 if instance.key_data: 68 new_kid = generate_key_id(instance.key_data) 69 legacy_kid = generate_key_id_legacy(instance.key_data) 70 if instance.kid not in (new_kid, legacy_kid): 71 instance.kid = new_kid
Automatically populate certificate metadata fields before saving