authentik.crypto.models

authentik crypto models

  1"""authentik crypto models"""
  2
  3from base64 import urlsafe_b64encode
  4from binascii import hexlify
  5from hashlib import md5, sha512
  6from ssl import PEM_FOOTER, PEM_HEADER
  7from textwrap import wrap
  8from uuid import uuid4
  9
 10from cryptography.hazmat.backends import default_backend
 11from cryptography.hazmat.primitives import hashes
 12from cryptography.hazmat.primitives.asymmetric.dsa import DSAPublicKey
 13from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
 14from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PublicKey
 15from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
 16from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
 17from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes, PublicKeyTypes
 18from cryptography.hazmat.primitives.serialization import load_pem_private_key
 19from cryptography.x509 import Certificate, load_pem_x509_certificate
 20from django.db import models
 21from django.utils.translation import gettext_lazy as _
 22from rest_framework.serializers import Serializer
 23from structlog.stdlib import get_logger
 24
 25from authentik.blueprints.models import ManagedModel
 26from authentik.lib.models import CreatedUpdatedModel, SerializerModel
 27
 28LOGGER = get_logger()
 29
 30
 31def format_cert(raw_pam: str) -> str:
 32    """Format a PEM certificate that is either missing its header/footer or is in a single line"""
 33    return "\n".join([PEM_HEADER, *wrap(raw_pam.replace("\n", ""), 64), PEM_FOOTER])
 34
 35
 36class KeyType(models.TextChoices):
 37    """Cryptographic key algorithm types"""
 38
 39    RSA = "rsa", _("RSA")
 40    EC = "ec", _("Elliptic Curve")
 41    DSA = "dsa", _("DSA")
 42    ED25519 = "ed25519", _("Ed25519")
 43    ED448 = "ed448", _("Ed448")
 44
 45
 46def fingerprint_sha256(cert: Certificate) -> str:
 47    """Get SHA256 Fingerprint of certificate"""
 48    return hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8")
 49
 50
 51def detect_key_type(certificate: Certificate) -> str | None:
 52    """Detect the key algorithm type by parsing the certificate's public key"""
 53    try:
 54        public_key = certificate.public_key()
 55        if isinstance(public_key, RSAPublicKey):
 56            return KeyType.RSA
 57        if isinstance(public_key, EllipticCurvePublicKey):
 58            return KeyType.EC
 59        if isinstance(public_key, DSAPublicKey):
 60            return KeyType.DSA
 61        if isinstance(public_key, Ed25519PublicKey):
 62            return KeyType.ED25519
 63        if isinstance(public_key, Ed448PublicKey):
 64            return KeyType.ED448
 65    except (ValueError, TypeError, AttributeError) as exc:
 66        LOGGER.warning("Failed to detect key type", exc=exc)
 67    return None
 68
 69
 70def generate_key_id(key_data: str) -> str:
 71    """Generate Key ID using SHA512 + urlsafe_b64encode."""
 72    if not key_data:
 73        return ""
 74    return urlsafe_b64encode(sha512(key_data.encode("utf-8")).digest()).decode("utf-8").rstrip("=")
 75
 76
 77def generate_key_id_legacy(key_data: str) -> str:
 78    """Generate Key ID using MD5 (legacy format for backwards compatibility)."""
 79    if not key_data:
 80        return ""
 81    return md5(key_data.encode("utf-8"), usedforsecurity=False).hexdigest()  # nosec
 82
 83
 84class CertificateKeyPair(SerializerModel, ManagedModel, CreatedUpdatedModel):
 85    """CertificateKeyPair that can be used for signing or encrypting if `key_data`
 86    is set, otherwise it can be used to verify remote data."""
 87
 88    kp_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
 89
 90    name = models.TextField(unique=True)
 91    certificate_data = models.TextField(help_text=_("PEM-encoded Certificate data"))
 92    key_data = models.TextField(
 93        help_text=_(
 94            "Optional Private Key. If this is set, you can use this keypair for encryption."
 95        ),
 96        blank=True,
 97        default="",
 98    )
 99    key_type = models.CharField(
100        max_length=16,
101        choices=KeyType.choices,
102        null=True,
103        blank=True,
104        help_text=_("Key algorithm type detected from the certificate's public key"),
105    )
106    cert_expiry = models.DateTimeField(
107        null=True,
108        blank=True,
109        help_text=_("Certificate expiry date"),
110    )
111    cert_subject = models.TextField(
112        null=True,
113        blank=True,
114        help_text=_("Certificate subject as RFC4514 string"),
115    )
116    fingerprint_sha256 = models.CharField(
117        max_length=95,
118        null=True,
119        blank=True,
120        help_text=_("SHA256 fingerprint of the certificate"),
121    )
122    fingerprint_sha1 = models.CharField(
123        max_length=59,
124        null=True,
125        blank=True,
126        help_text=_("SHA1 fingerprint of the certificate"),
127    )
128    kid = models.CharField(
129        max_length=128,
130        null=True,
131        blank=True,
132        help_text=_("Key ID generated from private key"),
133    )
134
135    _cert: Certificate | None = None
136    _private_key: PrivateKeyTypes | None = None
137    _public_key: PublicKeyTypes | None = None
138
139    @property
140    def serializer(self) -> Serializer:
141        from authentik.crypto.api import CertificateKeyPairSerializer
142
143        return CertificateKeyPairSerializer
144
145    @property
146    def certificate(self) -> Certificate:
147        """Get python cryptography Certificate instance"""
148        if not self._cert:
149            self._cert = load_pem_x509_certificate(
150                self.certificate_data.encode("utf-8"), default_backend()
151            )
152        return self._cert
153
154    @property
155    def public_key(self) -> PublicKeyTypes | None:
156        """Get public key of the private key"""
157        if not self._public_key:
158            self._public_key = self.private_key.public_key()
159        return self._public_key
160
161    @property
162    def private_key(
163        self,
164    ) -> PrivateKeyTypes | None:
165        """Get python cryptography PrivateKey instance"""
166        if not self._private_key and self.key_data != "":
167            try:
168                self._private_key = load_pem_private_key(
169                    str.encode("\n".join([x.strip() for x in self.key_data.split("\n")])),
170                    password=None,
171                    backend=default_backend(),
172                )
173            except ValueError as exc:
174                LOGGER.warning(exc)
175                return None
176        return self._private_key
177
178    def __str__(self) -> str:
179        return f"Certificate-Key Pair {self.name}"
180
181    class Meta:
182        verbose_name = _("Certificate-Key Pair")
183        verbose_name_plural = _("Certificate-Key Pairs")
184        permissions = [
185            ("view_certificatekeypair_certificate", _("View Certificate-Key pair's certificate")),
186            ("view_certificatekeypair_key", _("View Certificate-Key pair's private key")),
187        ]
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def format_cert(raw_pam: str) -> str:
32def format_cert(raw_pam: str) -> str:
33    """Format a PEM certificate that is either missing its header/footer or is in a single line"""
34    return "\n".join([PEM_HEADER, *wrap(raw_pam.replace("\n", ""), 64), PEM_FOOTER])

Format a PEM certificate that is either missing its header/footer or is in a single line

class KeyType(django.db.models.enums.TextChoices):
37class KeyType(models.TextChoices):
38    """Cryptographic key algorithm types"""
39
40    RSA = "rsa", _("RSA")
41    EC = "ec", _("Elliptic Curve")
42    DSA = "dsa", _("DSA")
43    ED25519 = "ed25519", _("Ed25519")
44    ED448 = "ed448", _("Ed448")

Cryptographic key algorithm types

RSA = KeyType.RSA
EC = KeyType.EC
DSA = KeyType.DSA
ED25519 = KeyType.ED25519
ED448 = KeyType.ED448
def fingerprint_sha256(cert: cryptography.hazmat.bindings._rust.x509.Certificate) -> str:
47def fingerprint_sha256(cert: Certificate) -> str:
48    """Get SHA256 Fingerprint of certificate"""
49    return hexlify(cert.fingerprint(hashes.SHA256()), ":").decode("utf-8")

Get SHA256 Fingerprint of certificate

def detect_key_type( certificate: cryptography.hazmat.bindings._rust.x509.Certificate) -> str | None:
52def detect_key_type(certificate: Certificate) -> str | None:
53    """Detect the key algorithm type by parsing the certificate's public key"""
54    try:
55        public_key = certificate.public_key()
56        if isinstance(public_key, RSAPublicKey):
57            return KeyType.RSA
58        if isinstance(public_key, EllipticCurvePublicKey):
59            return KeyType.EC
60        if isinstance(public_key, DSAPublicKey):
61            return KeyType.DSA
62        if isinstance(public_key, Ed25519PublicKey):
63            return KeyType.ED25519
64        if isinstance(public_key, Ed448PublicKey):
65            return KeyType.ED448
66    except (ValueError, TypeError, AttributeError) as exc:
67        LOGGER.warning("Failed to detect key type", exc=exc)
68    return None

Detect the key algorithm type by parsing the certificate's public key

def generate_key_id(key_data: str) -> str:
71def generate_key_id(key_data: str) -> str:
72    """Generate Key ID using SHA512 + urlsafe_b64encode."""
73    if not key_data:
74        return ""
75    return urlsafe_b64encode(sha512(key_data.encode("utf-8")).digest()).decode("utf-8").rstrip("=")

Generate Key ID using SHA512 + urlsafe_b64encode.

def generate_key_id_legacy(key_data: str) -> str:
78def generate_key_id_legacy(key_data: str) -> str:
79    """Generate Key ID using MD5 (legacy format for backwards compatibility)."""
80    if not key_data:
81        return ""
82    return md5(key_data.encode("utf-8"), usedforsecurity=False).hexdigest()  # nosec

Generate Key ID using MD5 (legacy format for backwards compatibility).

 85class CertificateKeyPair(SerializerModel, ManagedModel, CreatedUpdatedModel):
 86    """CertificateKeyPair that can be used for signing or encrypting if `key_data`
 87    is set, otherwise it can be used to verify remote data."""
 88
 89    kp_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
 90
 91    name = models.TextField(unique=True)
 92    certificate_data = models.TextField(help_text=_("PEM-encoded Certificate data"))
 93    key_data = models.TextField(
 94        help_text=_(
 95            "Optional Private Key. If this is set, you can use this keypair for encryption."
 96        ),
 97        blank=True,
 98        default="",
 99    )
100    key_type = models.CharField(
101        max_length=16,
102        choices=KeyType.choices,
103        null=True,
104        blank=True,
105        help_text=_("Key algorithm type detected from the certificate's public key"),
106    )
107    cert_expiry = models.DateTimeField(
108        null=True,
109        blank=True,
110        help_text=_("Certificate expiry date"),
111    )
112    cert_subject = models.TextField(
113        null=True,
114        blank=True,
115        help_text=_("Certificate subject as RFC4514 string"),
116    )
117    fingerprint_sha256 = models.CharField(
118        max_length=95,
119        null=True,
120        blank=True,
121        help_text=_("SHA256 fingerprint of the certificate"),
122    )
123    fingerprint_sha1 = models.CharField(
124        max_length=59,
125        null=True,
126        blank=True,
127        help_text=_("SHA1 fingerprint of the certificate"),
128    )
129    kid = models.CharField(
130        max_length=128,
131        null=True,
132        blank=True,
133        help_text=_("Key ID generated from private key"),
134    )
135
136    _cert: Certificate | None = None
137    _private_key: PrivateKeyTypes | None = None
138    _public_key: PublicKeyTypes | None = None
139
140    @property
141    def serializer(self) -> Serializer:
142        from authentik.crypto.api import CertificateKeyPairSerializer
143
144        return CertificateKeyPairSerializer
145
146    @property
147    def certificate(self) -> Certificate:
148        """Get python cryptography Certificate instance"""
149        if not self._cert:
150            self._cert = load_pem_x509_certificate(
151                self.certificate_data.encode("utf-8"), default_backend()
152            )
153        return self._cert
154
155    @property
156    def public_key(self) -> PublicKeyTypes | None:
157        """Get public key of the private key"""
158        if not self._public_key:
159            self._public_key = self.private_key.public_key()
160        return self._public_key
161
162    @property
163    def private_key(
164        self,
165    ) -> PrivateKeyTypes | None:
166        """Get python cryptography PrivateKey instance"""
167        if not self._private_key and self.key_data != "":
168            try:
169                self._private_key = load_pem_private_key(
170                    str.encode("\n".join([x.strip() for x in self.key_data.split("\n")])),
171                    password=None,
172                    backend=default_backend(),
173                )
174            except ValueError as exc:
175                LOGGER.warning(exc)
176                return None
177        return self._private_key
178
179    def __str__(self) -> str:
180        return f"Certificate-Key Pair {self.name}"
181
182    class Meta:
183        verbose_name = _("Certificate-Key Pair")
184        verbose_name_plural = _("Certificate-Key Pairs")
185        permissions = [
186            ("view_certificatekeypair_certificate", _("View Certificate-Key pair's certificate")),
187            ("view_certificatekeypair_key", _("View Certificate-Key pair's private key")),
188        ]

CertificateKeyPair that can be used for signing or encrypting if key_data is set, otherwise it can be used to verify remote data.

def kp_uuid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def certificate_data(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def key_data(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def key_type(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def cert_expiry(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def cert_subject(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def fingerprint_sha256(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def fingerprint_sha1(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def kid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: rest_framework.serializers.Serializer
140    @property
141    def serializer(self) -> Serializer:
142        from authentik.crypto.api import CertificateKeyPairSerializer
143
144        return CertificateKeyPairSerializer

Get serializer for this model

certificate: cryptography.hazmat.bindings._rust.x509.Certificate
146    @property
147    def certificate(self) -> Certificate:
148        """Get python cryptography Certificate instance"""
149        if not self._cert:
150            self._cert = load_pem_x509_certificate(
151                self.certificate_data.encode("utf-8"), default_backend()
152            )
153        return self._cert

Get python cryptography Certificate instance

public_key: cryptography.hazmat.primitives.asymmetric.dh.DHPublicKey | cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey | cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey | cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey | cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey | cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey | cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey | cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey | None
155    @property
156    def public_key(self) -> PublicKeyTypes | None:
157        """Get public key of the private key"""
158        if not self._public_key:
159            self._public_key = self.private_key.public_key()
160        return self._public_key

Get public key of the private key

private_key: cryptography.hazmat.primitives.asymmetric.dh.DHPrivateKey | cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey | cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey | cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey | cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey | cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey | cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey | cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey | None
162    @property
163    def private_key(
164        self,
165    ) -> PrivateKeyTypes | None:
166        """Get python cryptography PrivateKey instance"""
167        if not self._private_key and self.key_data != "":
168            try:
169                self._private_key = load_pem_private_key(
170                    str.encode("\n".join([x.strip() for x in self.key_data.split("\n")])),
171                    password=None,
172                    backend=default_backend(),
173                )
174            except ValueError as exc:
175                LOGGER.warning(exc)
176                return None
177        return self._private_key

Get python cryptography PrivateKey instance

def managed(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def created(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def last_updated(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def get_key_type_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_last_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_last_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def objects(unknown):

The type of the None singleton.

brand_set

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

notificationtransport_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

agentconnector_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

oauth2provider_signing_key_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

oauth2provider_encryption_key_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

ldapprovider_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

proxyprovider_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

radiusprovider_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

samlsource_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

samlprovider_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

ldap_peer_certificates

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

ldap_client_certificates

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

ssfprovider_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

mutualtlsstage_set

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class CertificateKeyPair.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class CertificateKeyPair.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.