authentik.sources.ldap.tasks

LDAP Sync tasks

  1"""LDAP Sync tasks"""
  2
  3from uuid import uuid4
  4
  5from django.core.cache import cache
  6from django.utils.translation import gettext_lazy as _
  7from dramatiq.actor import actor
  8from dramatiq.composition import group
  9from dramatiq.message import Message
 10from ldap3.core.exceptions import LDAPException
 11from structlog.stdlib import get_logger
 12
 13from authentik.lib.config import CONFIG
 14from authentik.lib.sync.incoming.models import SyncOutgoingTriggerMode
 15from authentik.lib.sync.outgoing.exceptions import StopSync
 16from authentik.lib.sync.outgoing.models import OutgoingSyncProvider
 17from authentik.lib.sync.outgoing.signals import sync_outgoing_inhibit_dispatch
 18from authentik.lib.utils.reflection import all_subclasses, class_to_path, path_to_class
 19from authentik.sources.ldap.models import LDAPSource
 20from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer
 21from authentik.sources.ldap.sync.forward_delete_groups import GroupLDAPForwardDeletion
 22from authentik.sources.ldap.sync.forward_delete_users import UserLDAPForwardDeletion
 23from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer
 24from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer
 25from authentik.sources.ldap.sync.users import UserLDAPSynchronizer
 26from authentik.tasks.middleware import CurrentTask
 27from authentik.tasks.models import Task
 28
 29LOGGER = get_logger()
 30SYNC_CLASSES: list[type[BaseLDAPSynchronizer]] = [
 31    UserLDAPSynchronizer,
 32    GroupLDAPSynchronizer,
 33    MembershipLDAPSynchronizer,
 34]
 35CACHE_KEY_PREFIX = "goauthentik.io/sources/ldap/page/"
 36CACHE_KEY_STATUS = "goauthentik.io/sources/ldap/status/"
 37
 38
 39@actor(description=_("Check connectivity for LDAP source."))
 40def ldap_connectivity_check(pk: str | None = None):
 41    """Check connectivity for LDAP Sources"""
 42    timeout = 60 * 60 * 2
 43    source = LDAPSource.objects.filter(pk=pk, enabled=True).first()
 44    if not source:
 45        return
 46    status = source.check_connection()
 47    cache.set(CACHE_KEY_STATUS + source.slug, status, timeout=timeout)
 48
 49
 50@actor(
 51    # We take the configured hours timeout time by 3.5 as we run user and
 52    # group in parallel and then membership, then deletions, so 3x is to cover the serial tasks,
 53    # and 0.5x on top of that to give some more leeway
 54    time_limit=(60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000) * 3.5,
 55    description=_("Sync LDAP source."),
 56)
 57def ldap_sync(source_pk: str):
 58    """Sync a single source"""
 59    task = CurrentTask.get_task()
 60    source: LDAPSource = LDAPSource.objects.filter(pk=source_pk, enabled=True).first()
 61    if not source:
 62        return
 63    with source.sync_lock as lock_acquired:
 64        if not lock_acquired:
 65            task.info("Synchronization is already running. Skipping")
 66            LOGGER.debug("Failed to acquire lock for LDAP sync, skipping task", source=source.slug)
 67            return
 68
 69        user_group_tasks = group(
 70            ldap_sync_paginator(task, source, UserLDAPSynchronizer)
 71            + ldap_sync_paginator(task, source, GroupLDAPSynchronizer)
 72        )
 73
 74        membership_tasks = group(ldap_sync_paginator(task, source, MembershipLDAPSynchronizer))
 75
 76        deletion_tasks = group(
 77            ldap_sync_paginator(task, source, UserLDAPForwardDeletion)
 78            + ldap_sync_paginator(task, source, GroupLDAPForwardDeletion),
 79        )
 80
 81        # User and group sync can happen at once, they have no dependencies on each other
 82        user_group_tasks.run().wait(
 83            timeout=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000
 84        )
 85        # Membership sync needs to run afterwards
 86        membership_tasks.run().wait(
 87            timeout=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000
 88        )
 89        # Finally, deletions. What we'd really like to do here is something like
 90        # ```
 91        # user_identifiers = <ldap query>
 92        # User.objects.exclude(
 93        #     usersourceconnection__identifier__in=user_uniqueness_identifiers,
 94        # ).delete()
 95        # ```
 96        # This runs into performance issues in large installations. So instead we spread the
 97        # work out into three steps:
 98        # 1. Get every object from the LDAP source.
 99        # 2. Mark every object as "safe" in the database. This is quick, but any error could
100        #    mean deleting users which should not be deleted, so we do it immediately, in
101        #    large chunks, and only queue the deletion step afterwards.
102        # 3. Delete every unmarked item. This is slow, so we spread it over many tasks in
103        #    small chunks.
104        deletion_tasks.run().wait(
105            timeout=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000,
106        )
107
108    if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.DEFERRED_END:
109        for outgoing_sync_provider_cls in all_subclasses(OutgoingSyncProvider):
110            for provider in outgoing_sync_provider_cls.objects.all():
111                provider.sync_dispatch()
112
113
114def ldap_sync_paginator(
115    task: Task, source: LDAPSource, sync: type[BaseLDAPSynchronizer]
116) -> list[Message]:
117    """Return a list of task signatures with LDAP pagination data"""
118    sync_inst: BaseLDAPSynchronizer = sync(source, task)
119    messages = []
120    for page in sync_inst.get_objects():
121        page_uid = str(uuid4())
122        page_cache_key = CACHE_KEY_PREFIX + page_uid
123        cache.set(page_cache_key, page, 60 * 60 * CONFIG.get_int("ldap.task_timeout_hours"))
124        page_sync = ldap_sync_page.message_with_options(
125            args=(source.pk, class_to_path(sync), page_cache_key),
126            rel_obj=task.rel_obj,
127            uid=f"{source.slug}:{sync_inst.name()}:{page_uid}",
128        )
129        messages.append(page_sync)
130    return messages
131
132
133@actor(
134    time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000,
135    description=_("Sync page for LDAP source."),
136)
137def ldap_sync_page(source_pk: str, sync_class: str, page_cache_key: str):
138    """Synchronization of an LDAP Source"""
139    self = CurrentTask.get_task()
140    source: LDAPSource = LDAPSource.objects.filter(pk=source_pk).first()
141    if not source:
142        # Because the source couldn't be found, we don't have a UID
143        # to set the state with
144        return
145    sync: type[BaseLDAPSynchronizer] = path_to_class(sync_class)
146    try:
147        sync_inst: BaseLDAPSynchronizer = sync(source, self)
148        page = cache.get(page_cache_key)
149        if not page:
150            error_message = (
151                f"Could not find page in cache: {page_cache_key}. "
152                + "Try increasing ldap.task_timeout_hours"
153            )
154            LOGGER.warning(error_message)
155            self.error(error_message)
156            return
157        cache.touch(page_cache_key)
158        if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.IMMEDIATE:
159            count = sync_inst.sync(page)
160        else:
161            with sync_outgoing_inhibit_dispatch():
162                count = sync_inst.sync(page)
163        self.info(f"Synced {count} objects.")
164        cache.delete(page_cache_key)
165    except (LDAPException, StopSync) as exc:
166        # No explicit event is created here as .error will do that
167        LOGGER.warning("Failed to sync LDAP", exc=exc, source=source)
168        self.error(exc)
169        raise exc
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
CACHE_KEY_PREFIX = 'goauthentik.io/sources/ldap/page/'
CACHE_KEY_STATUS = 'goauthentik.io/sources/ldap/status/'
ldap_connectivity_check = Actor(<function ldap_connectivity_check>, queue_name='default', actor_name='ldap_connectivity_check')

Check connectivity for LDAP Sources

ldap_sync = Actor(<function ldap_sync>, queue_name='default', actor_name='ldap_sync')

Sync a single source

def ldap_sync_paginator( task: authentik.tasks.models.Task, source: authentik.sources.ldap.models.LDAPSource, sync: type[authentik.sources.ldap.sync.base.BaseLDAPSynchronizer]) -> list[dramatiq.message.Message]:
115def ldap_sync_paginator(
116    task: Task, source: LDAPSource, sync: type[BaseLDAPSynchronizer]
117) -> list[Message]:
118    """Return a list of task signatures with LDAP pagination data"""
119    sync_inst: BaseLDAPSynchronizer = sync(source, task)
120    messages = []
121    for page in sync_inst.get_objects():
122        page_uid = str(uuid4())
123        page_cache_key = CACHE_KEY_PREFIX + page_uid
124        cache.set(page_cache_key, page, 60 * 60 * CONFIG.get_int("ldap.task_timeout_hours"))
125        page_sync = ldap_sync_page.message_with_options(
126            args=(source.pk, class_to_path(sync), page_cache_key),
127            rel_obj=task.rel_obj,
128            uid=f"{source.slug}:{sync_inst.name()}:{page_uid}",
129        )
130        messages.append(page_sync)
131    return messages

Return a list of task signatures with LDAP pagination data

ldap_sync_page = Actor(<function ldap_sync_page>, queue_name='default', actor_name='ldap_sync_page')

Synchronization of an LDAP Source