authentik.crypto.signals

authentik crypto signals

 1"""authentik crypto signals"""
 2
 3from binascii import hexlify
 4from datetime import datetime
 5from ssl import CertificateError
 6
 7from cryptography.hazmat.primitives import hashes
 8from cryptography.x509 import Certificate
 9from django.db.models.signals import pre_save
10from django.dispatch import receiver
11from structlog.stdlib import get_logger
12
13from authentik.crypto.models import (
14    CertificateKeyPair,
15    detect_key_type,
16    fingerprint_sha256,
17    generate_key_id,
18    generate_key_id_legacy,
19)
20
21LOGGER = get_logger()
22
23
24def extract_certificate_metadata(certificate: Certificate) -> dict[str, str | datetime]:
25    """Extract all metadata fields from a certificate."""
26    metadata = {}
27
28    try:
29        metadata["key_type"] = detect_key_type(certificate)
30        metadata["cert_expiry"] = certificate.not_valid_after_utc
31        metadata["cert_subject"] = certificate.subject.rfc4514_string()
32        metadata["fingerprint_sha256"] = fingerprint_sha256(certificate)
33        metadata["fingerprint_sha1"] = hexlify(
34            certificate.fingerprint(hashes.SHA1()), ":"  # nosec
35        ).decode("utf-8")
36    except (ValueError, TypeError, AttributeError) as exc:
37        raise CertificateError(f"Invalid certificate metadata: {exc}") from exc
38
39    return metadata
40
41
42@receiver(pre_save, sender="authentik_crypto.CertificateKeyPair")
43def certificate_key_pair_pre_save(
44    sender: type[CertificateKeyPair], instance: CertificateKeyPair, **_
45):
46    """Automatically populate certificate metadata fields before saving"""
47
48    # Only extract metadata if certificate_data is present
49    if not instance.certificate_data:
50        return
51
52    try:
53        metadata = extract_certificate_metadata(instance.certificate)
54    except (CertificateError, ValueError, TypeError, AttributeError) as exc:
55        LOGGER.warning("Failed to extract certificate metadata", exc=exc)
56        return
57
58    instance.key_type = metadata["key_type"]
59    instance.cert_expiry = metadata["cert_expiry"]
60    instance.cert_subject = metadata["cert_subject"]
61    instance.fingerprint_sha256 = metadata["fingerprint_sha256"]
62    instance.fingerprint_sha1 = metadata["fingerprint_sha1"]
63
64    # Generate kid if not set, or regenerate if key_data has changed
65    # Preserve existing kid (MD5 or SHA512) if it matches the current key_data
66    if instance.key_data:
67        new_kid = generate_key_id(instance.key_data)
68        legacy_kid = generate_key_id_legacy(instance.key_data)
69        if instance.kid not in (new_kid, legacy_kid):
70            instance.kid = new_kid
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def extract_certificate_metadata( certificate: cryptography.hazmat.bindings._rust.x509.Certificate) -> dict[str, str | datetime.datetime]:
25def extract_certificate_metadata(certificate: Certificate) -> dict[str, str | datetime]:
26    """Extract all metadata fields from a certificate."""
27    metadata = {}
28
29    try:
30        metadata["key_type"] = detect_key_type(certificate)
31        metadata["cert_expiry"] = certificate.not_valid_after_utc
32        metadata["cert_subject"] = certificate.subject.rfc4514_string()
33        metadata["fingerprint_sha256"] = fingerprint_sha256(certificate)
34        metadata["fingerprint_sha1"] = hexlify(
35            certificate.fingerprint(hashes.SHA1()), ":"  # nosec
36        ).decode("utf-8")
37    except (ValueError, TypeError, AttributeError) as exc:
38        raise CertificateError(f"Invalid certificate metadata: {exc}") from exc
39
40    return metadata

Extract all metadata fields from a certificate.

@receiver(pre_save, sender='authentik_crypto.CertificateKeyPair')
def certificate_key_pair_pre_save( sender: type[authentik.crypto.models.CertificateKeyPair], instance: authentik.crypto.models.CertificateKeyPair, **_):
43@receiver(pre_save, sender="authentik_crypto.CertificateKeyPair")
44def certificate_key_pair_pre_save(
45    sender: type[CertificateKeyPair], instance: CertificateKeyPair, **_
46):
47    """Automatically populate certificate metadata fields before saving"""
48
49    # Only extract metadata if certificate_data is present
50    if not instance.certificate_data:
51        return
52
53    try:
54        metadata = extract_certificate_metadata(instance.certificate)
55    except (CertificateError, ValueError, TypeError, AttributeError) as exc:
56        LOGGER.warning("Failed to extract certificate metadata", exc=exc)
57        return
58
59    instance.key_type = metadata["key_type"]
60    instance.cert_expiry = metadata["cert_expiry"]
61    instance.cert_subject = metadata["cert_subject"]
62    instance.fingerprint_sha256 = metadata["fingerprint_sha256"]
63    instance.fingerprint_sha1 = metadata["fingerprint_sha1"]
64
65    # Generate kid if not set, or regenerate if key_data has changed
66    # Preserve existing kid (MD5 or SHA512) if it matches the current key_data
67    if instance.key_data:
68        new_kid = generate_key_id(instance.key_data)
69        legacy_kid = generate_key_id_legacy(instance.key_data)
70        if instance.kid not in (new_kid, legacy_kid):
71            instance.kid = new_kid

Automatically populate certificate metadata fields before saving