authentik.crypto.apps
authentik crypto app config
1"""authentik crypto app config""" 2 3from dramatiq.broker import get_broker 4 5from authentik.blueprints.apps import ManagedAppConfig 6from authentik.lib.generators import generate_id 7from authentik.lib.utils.time import fqdn_rand 8from authentik.tasks.schedules.common import ScheduleSpec 9 10MANAGED_KEY = "goauthentik.io/crypto/jwt-managed" 11 12 13class AuthentikCryptoConfig(ManagedAppConfig): 14 """authentik crypto app config""" 15 16 name = "authentik.crypto" 17 label = "authentik_crypto" 18 verbose_name = "authentik Crypto" 19 default = True 20 21 def _create_update_cert(self): 22 from authentik.crypto.builder import CertificateBuilder 23 from authentik.crypto.models import CertificateKeyPair 24 25 common_name = "authentik Internal JWT Certificate" 26 builder = CertificateBuilder(common_name) 27 builder.build( 28 subject_alt_names=["goauthentik.io"], 29 validity_days=360, 30 ) 31 CertificateKeyPair.objects.update_or_create( 32 managed=MANAGED_KEY, 33 defaults={ 34 "name": common_name, 35 "certificate_data": builder.certificate, 36 "key_data": builder.private_key, 37 }, 38 ) 39 40 @ManagedAppConfig.reconcile_tenant 41 def managed_jwt_cert(self): 42 """Ensure managed JWT certificate""" 43 from authentik.crypto.models import CertificateKeyPair 44 45 cert: CertificateKeyPair | None = CertificateKeyPair.objects.filter( 46 managed=MANAGED_KEY 47 ).first() 48 if not cert: 49 self._create_update_cert() 50 51 @ManagedAppConfig.reconcile_tenant 52 def self_signed(self): 53 """Create self-signed keypair""" 54 from authentik.crypto.builder import CertificateBuilder 55 from authentik.crypto.models import CertificateKeyPair 56 57 name = "authentik Self-signed Certificate" 58 if CertificateKeyPair.objects.filter(name=name).exists(): 59 return 60 builder = CertificateBuilder(name) 61 builder.build(subject_alt_names=[f"{generate_id()}.self-signed.goauthentik.io"]) 62 CertificateKeyPair.objects.get_or_create( 63 name=name, 64 defaults={ 65 "certificate_data": builder.certificate, 66 "key_data": builder.private_key, 67 }, 68 ) 69 70 @ManagedAppConfig.reconcile_global 71 def tasks_middlewares(self): 72 from authentik.crypto.tasks import CertificateWatcherMiddleware 73 74 get_broker().add_middleware(CertificateWatcherMiddleware()) 75 76 @property 77 def tenant_schedule_specs(self) -> list[ScheduleSpec]: 78 from authentik.crypto.tasks import certificate_discovery 79 80 return [ 81 ScheduleSpec( 82 actor=certificate_discovery, 83 crontab=f"{fqdn_rand('crypto_certificate_discovery')} * * * *", 84 ), 85 ]
MANAGED_KEY =
'goauthentik.io/crypto/jwt-managed'
14class AuthentikCryptoConfig(ManagedAppConfig): 15 """authentik crypto app config""" 16 17 name = "authentik.crypto" 18 label = "authentik_crypto" 19 verbose_name = "authentik Crypto" 20 default = True 21 22 def _create_update_cert(self): 23 from authentik.crypto.builder import CertificateBuilder 24 from authentik.crypto.models import CertificateKeyPair 25 26 common_name = "authentik Internal JWT Certificate" 27 builder = CertificateBuilder(common_name) 28 builder.build( 29 subject_alt_names=["goauthentik.io"], 30 validity_days=360, 31 ) 32 CertificateKeyPair.objects.update_or_create( 33 managed=MANAGED_KEY, 34 defaults={ 35 "name": common_name, 36 "certificate_data": builder.certificate, 37 "key_data": builder.private_key, 38 }, 39 ) 40 41 @ManagedAppConfig.reconcile_tenant 42 def managed_jwt_cert(self): 43 """Ensure managed JWT certificate""" 44 from authentik.crypto.models import CertificateKeyPair 45 46 cert: CertificateKeyPair | None = CertificateKeyPair.objects.filter( 47 managed=MANAGED_KEY 48 ).first() 49 if not cert: 50 self._create_update_cert() 51 52 @ManagedAppConfig.reconcile_tenant 53 def self_signed(self): 54 """Create self-signed keypair""" 55 from authentik.crypto.builder import CertificateBuilder 56 from authentik.crypto.models import CertificateKeyPair 57 58 name = "authentik Self-signed Certificate" 59 if CertificateKeyPair.objects.filter(name=name).exists(): 60 return 61 builder = CertificateBuilder(name) 62 builder.build(subject_alt_names=[f"{generate_id()}.self-signed.goauthentik.io"]) 63 CertificateKeyPair.objects.get_or_create( 64 name=name, 65 defaults={ 66 "certificate_data": builder.certificate, 67 "key_data": builder.private_key, 68 }, 69 ) 70 71 @ManagedAppConfig.reconcile_global 72 def tasks_middlewares(self): 73 from authentik.crypto.tasks import CertificateWatcherMiddleware 74 75 get_broker().add_middleware(CertificateWatcherMiddleware()) 76 77 @property 78 def tenant_schedule_specs(self) -> list[ScheduleSpec]: 79 from authentik.crypto.tasks import certificate_discovery 80 81 return [ 82 ScheduleSpec( 83 actor=certificate_discovery, 84 crontab=f"{fqdn_rand('crypto_certificate_discovery')} * * * *", 85 ), 86 ]
authentik crypto app config
name =
'authentik.crypto'
@ManagedAppConfig.reconcile_tenant
def
managed_jwt_cert(self):
41 @ManagedAppConfig.reconcile_tenant 42 def managed_jwt_cert(self): 43 """Ensure managed JWT certificate""" 44 from authentik.crypto.models import CertificateKeyPair 45 46 cert: CertificateKeyPair | None = CertificateKeyPair.objects.filter( 47 managed=MANAGED_KEY 48 ).first() 49 if not cert: 50 self._create_update_cert()
Ensure managed JWT certificate
@ManagedAppConfig.reconcile_tenant
def
self_signed(self):
52 @ManagedAppConfig.reconcile_tenant 53 def self_signed(self): 54 """Create self-signed keypair""" 55 from authentik.crypto.builder import CertificateBuilder 56 from authentik.crypto.models import CertificateKeyPair 57 58 name = "authentik Self-signed Certificate" 59 if CertificateKeyPair.objects.filter(name=name).exists(): 60 return 61 builder = CertificateBuilder(name) 62 builder.build(subject_alt_names=[f"{generate_id()}.self-signed.goauthentik.io"]) 63 CertificateKeyPair.objects.get_or_create( 64 name=name, 65 defaults={ 66 "certificate_data": builder.certificate, 67 "key_data": builder.private_key, 68 }, 69 )
Create self-signed keypair
tenant_schedule_specs: list[authentik.tasks.schedules.common.ScheduleSpec]
77 @property 78 def tenant_schedule_specs(self) -> list[ScheduleSpec]: 79 from authentik.crypto.tasks import certificate_discovery 80 81 return [ 82 ScheduleSpec( 83 actor=certificate_discovery, 84 crontab=f"{fqdn_rand('crypto_certificate_discovery')} * * * *", 85 ), 86 ]
Get a list of schedule specs that must exist in each tenant