authentik.sources.kerberos.tasks

Kerberos Sync tasks

 1"""Kerberos Sync tasks"""
 2
 3from django.core.cache import cache
 4from django.utils.translation import gettext_lazy as _
 5from dramatiq.actor import actor
 6from structlog.stdlib import get_logger
 7
 8from authentik.lib.config import CONFIG
 9from authentik.lib.sync.incoming.models import SyncOutgoingTriggerMode
10from authentik.lib.sync.outgoing.exceptions import StopSync
11from authentik.lib.sync.outgoing.models import OutgoingSyncProvider
12from authentik.lib.sync.outgoing.signals import sync_outgoing_inhibit_dispatch
13from authentik.lib.utils.reflection import all_subclasses
14from authentik.sources.kerberos.models import KerberosSource
15from authentik.sources.kerberos.sync import KerberosSync
16from authentik.tasks.middleware import CurrentTask
17
18LOGGER = get_logger()
19CACHE_KEY_STATUS = "goauthentik.io/sources/kerberos/status/"
20
21
22@actor(description=_("Check connectivity for Kerberos sources."))
23def kerberos_connectivity_check(pk: str):
24    """Check connectivity for Kerberos Sources"""
25    # 2 hour timeout, this task should run every hour
26    timeout = 60 * 60 * 2
27    source = KerberosSource.objects.filter(enabled=True, pk=pk).first()
28    if not source:
29        return
30    status = source.check_connection()
31    cache.set(CACHE_KEY_STATUS + source.slug, status, timeout=timeout)
32
33
34@actor(
35    time_limit=(60 * 60 * CONFIG.get_int("sources.kerberos.task_timeout_hours")) * 2.5 * 1000,
36    description=_("Sync Kerberos source."),
37)
38def kerberos_sync(pk: str):
39    self = CurrentTask.get_task()
40    source: KerberosSource = KerberosSource.objects.filter(enabled=True, pk=pk).first()
41    if not source:
42        return
43    try:
44        with source.sync_lock as lock_acquired:
45            if not lock_acquired:
46                self.info("Synchronization is already running. Skipping")
47                LOGGER.debug(
48                    "Failed to acquire lock for Kerberos sync, skipping task", source=source.slug
49                )
50                return
51            syncer = KerberosSync(source, self)
52            if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.IMMEDIATE:
53                syncer.sync()
54            else:
55                with sync_outgoing_inhibit_dispatch():
56                    syncer.sync()
57        if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.DEFERRED_END:
58            for outgoing_sync_provider_cls in all_subclasses(OutgoingSyncProvider):
59                for provider in outgoing_sync_provider_cls.objects.all():
60                    provider.sync_dispatch()
61    except StopSync as exc:
62        LOGGER.warning("Error syncing kerberos", exc=exc, source=source)
63        self.error(exc)
64        raise exc
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
CACHE_KEY_STATUS = 'goauthentik.io/sources/kerberos/status/'
kerberos_connectivity_check = Actor(<function kerberos_connectivity_check>, queue_name='default', actor_name='kerberos_connectivity_check')

Check connectivity for Kerberos Sources