authentik.sources.kerberos.tasks
Kerberos Sync tasks
1"""Kerberos Sync tasks""" 2 3from django.core.cache import cache 4from django.utils.translation import gettext_lazy as _ 5from dramatiq.actor import actor 6from structlog.stdlib import get_logger 7 8from authentik.lib.config import CONFIG 9from authentik.lib.sync.incoming.models import SyncOutgoingTriggerMode 10from authentik.lib.sync.outgoing.exceptions import StopSync 11from authentik.lib.sync.outgoing.models import OutgoingSyncProvider 12from authentik.lib.sync.outgoing.signals import sync_outgoing_inhibit_dispatch 13from authentik.lib.utils.reflection import all_subclasses 14from authentik.sources.kerberos.models import KerberosSource 15from authentik.sources.kerberos.sync import KerberosSync 16from authentik.tasks.middleware import CurrentTask 17 18LOGGER = get_logger() 19CACHE_KEY_STATUS = "goauthentik.io/sources/kerberos/status/" 20 21 22@actor(description=_("Check connectivity for Kerberos sources.")) 23def kerberos_connectivity_check(pk: str): 24 """Check connectivity for Kerberos Sources""" 25 # 2 hour timeout, this task should run every hour 26 timeout = 60 * 60 * 2 27 source = KerberosSource.objects.filter(enabled=True, pk=pk).first() 28 if not source: 29 return 30 status = source.check_connection() 31 cache.set(CACHE_KEY_STATUS + source.slug, status, timeout=timeout) 32 33 34@actor( 35 time_limit=(60 * 60 * CONFIG.get_int("sources.kerberos.task_timeout_hours")) * 2.5 * 1000, 36 description=_("Sync Kerberos source."), 37) 38def kerberos_sync(pk: str): 39 self = CurrentTask.get_task() 40 source: KerberosSource = KerberosSource.objects.filter(enabled=True, pk=pk).first() 41 if not source: 42 return 43 try: 44 with source.sync_lock as lock_acquired: 45 if not lock_acquired: 46 self.info("Synchronization is already running. Skipping") 47 LOGGER.debug( 48 "Failed to acquire lock for Kerberos sync, skipping task", source=source.slug 49 ) 50 return 51 syncer = KerberosSync(source, self) 52 if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.IMMEDIATE: 53 syncer.sync() 54 else: 55 with sync_outgoing_inhibit_dispatch(): 56 syncer.sync() 57 if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.DEFERRED_END: 58 for outgoing_sync_provider_cls in all_subclasses(OutgoingSyncProvider): 59 for provider in outgoing_sync_provider_cls.objects.all(): 60 provider.sync_dispatch() 61 except StopSync as exc: 62 LOGGER.warning("Error syncing kerberos", exc=exc, source=source) 63 self.error(exc) 64 raise exc
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
CACHE_KEY_STATUS =
'goauthentik.io/sources/kerberos/status/'
kerberos_connectivity_check =
Actor(<function kerberos_connectivity_check>, queue_name='default', actor_name='kerberos_connectivity_check')
Check connectivity for Kerberos Sources