authentik.crypto.apps

authentik crypto app config

 1"""authentik crypto app config"""
 2
 3from dramatiq.broker import get_broker
 4
 5from authentik.blueprints.apps import ManagedAppConfig
 6from authentik.lib.generators import generate_id
 7from authentik.lib.utils.time import fqdn_rand
 8from authentik.tasks.schedules.common import ScheduleSpec
 9
10MANAGED_KEY = "goauthentik.io/crypto/jwt-managed"
11
12
13class AuthentikCryptoConfig(ManagedAppConfig):
14    """authentik crypto app config"""
15
16    name = "authentik.crypto"
17    label = "authentik_crypto"
18    verbose_name = "authentik Crypto"
19    default = True
20
21    def _create_update_cert(self):
22        from authentik.crypto.builder import CertificateBuilder
23        from authentik.crypto.models import CertificateKeyPair
24
25        common_name = "authentik Internal JWT Certificate"
26        builder = CertificateBuilder(common_name)
27        builder.build(
28            subject_alt_names=["goauthentik.io"],
29            validity_days=360,
30        )
31        CertificateKeyPair.objects.update_or_create(
32            managed=MANAGED_KEY,
33            defaults={
34                "name": common_name,
35                "certificate_data": builder.certificate,
36                "key_data": builder.private_key,
37            },
38        )
39
40    @ManagedAppConfig.reconcile_tenant
41    def managed_jwt_cert(self):
42        """Ensure managed JWT certificate"""
43        from authentik.crypto.models import CertificateKeyPair
44
45        cert: CertificateKeyPair | None = CertificateKeyPair.objects.filter(
46            managed=MANAGED_KEY
47        ).first()
48        if not cert:
49            self._create_update_cert()
50
51    @ManagedAppConfig.reconcile_tenant
52    def self_signed(self):
53        """Create self-signed keypair"""
54        from authentik.crypto.builder import CertificateBuilder
55        from authentik.crypto.models import CertificateKeyPair
56
57        name = "authentik Self-signed Certificate"
58        if CertificateKeyPair.objects.filter(name=name).exists():
59            return
60        builder = CertificateBuilder(name)
61        builder.build(subject_alt_names=[f"{generate_id()}.self-signed.goauthentik.io"])
62        CertificateKeyPair.objects.get_or_create(
63            name=name,
64            defaults={
65                "certificate_data": builder.certificate,
66                "key_data": builder.private_key,
67            },
68        )
69
70    @ManagedAppConfig.reconcile_global
71    def tasks_middlewares(self):
72        from authentik.crypto.tasks import CertificateWatcherMiddleware
73
74        get_broker().add_middleware(CertificateWatcherMiddleware())
75
76    @property
77    def tenant_schedule_specs(self) -> list[ScheduleSpec]:
78        from authentik.crypto.tasks import certificate_discovery
79
80        return [
81            ScheduleSpec(
82                actor=certificate_discovery,
83                crontab=f"{fqdn_rand('crypto_certificate_discovery')} * * * *",
84            ),
85        ]
MANAGED_KEY = 'goauthentik.io/crypto/jwt-managed'
class AuthentikCryptoConfig(authentik.blueprints.apps.ManagedAppConfig):
14class AuthentikCryptoConfig(ManagedAppConfig):
15    """authentik crypto app config"""
16
17    name = "authentik.crypto"
18    label = "authentik_crypto"
19    verbose_name = "authentik Crypto"
20    default = True
21
22    def _create_update_cert(self):
23        from authentik.crypto.builder import CertificateBuilder
24        from authentik.crypto.models import CertificateKeyPair
25
26        common_name = "authentik Internal JWT Certificate"
27        builder = CertificateBuilder(common_name)
28        builder.build(
29            subject_alt_names=["goauthentik.io"],
30            validity_days=360,
31        )
32        CertificateKeyPair.objects.update_or_create(
33            managed=MANAGED_KEY,
34            defaults={
35                "name": common_name,
36                "certificate_data": builder.certificate,
37                "key_data": builder.private_key,
38            },
39        )
40
41    @ManagedAppConfig.reconcile_tenant
42    def managed_jwt_cert(self):
43        """Ensure managed JWT certificate"""
44        from authentik.crypto.models import CertificateKeyPair
45
46        cert: CertificateKeyPair | None = CertificateKeyPair.objects.filter(
47            managed=MANAGED_KEY
48        ).first()
49        if not cert:
50            self._create_update_cert()
51
52    @ManagedAppConfig.reconcile_tenant
53    def self_signed(self):
54        """Create self-signed keypair"""
55        from authentik.crypto.builder import CertificateBuilder
56        from authentik.crypto.models import CertificateKeyPair
57
58        name = "authentik Self-signed Certificate"
59        if CertificateKeyPair.objects.filter(name=name).exists():
60            return
61        builder = CertificateBuilder(name)
62        builder.build(subject_alt_names=[f"{generate_id()}.self-signed.goauthentik.io"])
63        CertificateKeyPair.objects.get_or_create(
64            name=name,
65            defaults={
66                "certificate_data": builder.certificate,
67                "key_data": builder.private_key,
68            },
69        )
70
71    @ManagedAppConfig.reconcile_global
72    def tasks_middlewares(self):
73        from authentik.crypto.tasks import CertificateWatcherMiddleware
74
75        get_broker().add_middleware(CertificateWatcherMiddleware())
76
77    @property
78    def tenant_schedule_specs(self) -> list[ScheduleSpec]:
79        from authentik.crypto.tasks import certificate_discovery
80
81        return [
82            ScheduleSpec(
83                actor=certificate_discovery,
84                crontab=f"{fqdn_rand('crypto_certificate_discovery')} * * * *",
85            ),
86        ]

authentik crypto app config

name = 'authentik.crypto'
label = 'authentik_crypto'
verbose_name = 'authentik Crypto'
default = True
@ManagedAppConfig.reconcile_tenant
def managed_jwt_cert(self):
41    @ManagedAppConfig.reconcile_tenant
42    def managed_jwt_cert(self):
43        """Ensure managed JWT certificate"""
44        from authentik.crypto.models import CertificateKeyPair
45
46        cert: CertificateKeyPair | None = CertificateKeyPair.objects.filter(
47            managed=MANAGED_KEY
48        ).first()
49        if not cert:
50            self._create_update_cert()

Ensure managed JWT certificate

@ManagedAppConfig.reconcile_tenant
def self_signed(self):
52    @ManagedAppConfig.reconcile_tenant
53    def self_signed(self):
54        """Create self-signed keypair"""
55        from authentik.crypto.builder import CertificateBuilder
56        from authentik.crypto.models import CertificateKeyPair
57
58        name = "authentik Self-signed Certificate"
59        if CertificateKeyPair.objects.filter(name=name).exists():
60            return
61        builder = CertificateBuilder(name)
62        builder.build(subject_alt_names=[f"{generate_id()}.self-signed.goauthentik.io"])
63        CertificateKeyPair.objects.get_or_create(
64            name=name,
65            defaults={
66                "certificate_data": builder.certificate,
67                "key_data": builder.private_key,
68            },
69        )

Create self-signed keypair

@ManagedAppConfig.reconcile_global
def tasks_middlewares(self):
71    @ManagedAppConfig.reconcile_global
72    def tasks_middlewares(self):
73        from authentik.crypto.tasks import CertificateWatcherMiddleware
74
75        get_broker().add_middleware(CertificateWatcherMiddleware())
tenant_schedule_specs: list[authentik.tasks.schedules.common.ScheduleSpec]
77    @property
78    def tenant_schedule_specs(self) -> list[ScheduleSpec]:
79        from authentik.crypto.tasks import certificate_discovery
80
81        return [
82            ScheduleSpec(
83                actor=certificate_discovery,
84                crontab=f"{fqdn_rand('crypto_certificate_discovery')} * * * *",
85            ),
86        ]

Get a list of schedule specs that must exist in each tenant