authentik.sources.ldap.tasks
LDAP Sync tasks
1"""LDAP Sync tasks""" 2 3from uuid import uuid4 4 5from django.core.cache import cache 6from django.utils.translation import gettext_lazy as _ 7from dramatiq.actor import actor 8from dramatiq.composition import group 9from dramatiq.message import Message 10from ldap3.core.exceptions import LDAPException 11from structlog.stdlib import get_logger 12 13from authentik.lib.config import CONFIG 14from authentik.lib.sync.incoming.models import SyncOutgoingTriggerMode 15from authentik.lib.sync.outgoing.exceptions import StopSync 16from authentik.lib.sync.outgoing.models import OutgoingSyncProvider 17from authentik.lib.sync.outgoing.signals import sync_outgoing_inhibit_dispatch 18from authentik.lib.utils.reflection import all_subclasses, class_to_path, path_to_class 19from authentik.sources.ldap.models import LDAPSource 20from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer 21from authentik.sources.ldap.sync.forward_delete_groups import GroupLDAPForwardDeletion 22from authentik.sources.ldap.sync.forward_delete_users import UserLDAPForwardDeletion 23from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer 24from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer 25from authentik.sources.ldap.sync.users import UserLDAPSynchronizer 26from authentik.tasks.middleware import CurrentTask 27from authentik.tasks.models import Task 28 29LOGGER = get_logger() 30SYNC_CLASSES: list[type[BaseLDAPSynchronizer]] = [ 31 UserLDAPSynchronizer, 32 GroupLDAPSynchronizer, 33 MembershipLDAPSynchronizer, 34] 35CACHE_KEY_PREFIX = "goauthentik.io/sources/ldap/page/" 36CACHE_KEY_STATUS = "goauthentik.io/sources/ldap/status/" 37 38 39@actor(description=_("Check connectivity for LDAP source.")) 40def ldap_connectivity_check(pk: str | None = None): 41 """Check connectivity for LDAP Sources""" 42 timeout = 60 * 60 * 2 43 source = LDAPSource.objects.filter(pk=pk, enabled=True).first() 44 if not source: 45 return 46 status = source.check_connection() 47 cache.set(CACHE_KEY_STATUS + source.slug, status, timeout=timeout) 48 49 50@actor( 51 # We take the configured hours timeout time by 3.5 as we run user and 52 # group in parallel and then membership, then deletions, so 3x is to cover the serial tasks, 53 # and 0.5x on top of that to give some more leeway 54 time_limit=(60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000) * 3.5, 55 description=_("Sync LDAP source."), 56) 57def ldap_sync(source_pk: str): 58 """Sync a single source""" 59 task = CurrentTask.get_task() 60 source: LDAPSource = LDAPSource.objects.filter(pk=source_pk, enabled=True).first() 61 if not source: 62 return 63 with source.sync_lock as lock_acquired: 64 if not lock_acquired: 65 task.info("Synchronization is already running. Skipping") 66 LOGGER.debug("Failed to acquire lock for LDAP sync, skipping task", source=source.slug) 67 return 68 69 user_group_tasks = group( 70 ldap_sync_paginator(task, source, UserLDAPSynchronizer) 71 + ldap_sync_paginator(task, source, GroupLDAPSynchronizer) 72 ) 73 74 membership_tasks = group(ldap_sync_paginator(task, source, MembershipLDAPSynchronizer)) 75 76 deletion_tasks = group( 77 ldap_sync_paginator(task, source, UserLDAPForwardDeletion) 78 + ldap_sync_paginator(task, source, GroupLDAPForwardDeletion), 79 ) 80 81 # User and group sync can happen at once, they have no dependencies on each other 82 user_group_tasks.run().wait( 83 timeout=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000 84 ) 85 # Membership sync needs to run afterwards 86 membership_tasks.run().wait( 87 timeout=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000 88 ) 89 # Finally, deletions. What we'd really like to do here is something like 90 # ``` 91 # user_identifiers = <ldap query> 92 # User.objects.exclude( 93 # usersourceconnection__identifier__in=user_uniqueness_identifiers, 94 # ).delete() 95 # ``` 96 # This runs into performance issues in large installations. So instead we spread the 97 # work out into three steps: 98 # 1. Get every object from the LDAP source. 99 # 2. Mark every object as "safe" in the database. This is quick, but any error could 100 # mean deleting users which should not be deleted, so we do it immediately, in 101 # large chunks, and only queue the deletion step afterwards. 102 # 3. Delete every unmarked item. This is slow, so we spread it over many tasks in 103 # small chunks. 104 deletion_tasks.run().wait( 105 timeout=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000, 106 ) 107 108 if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.DEFERRED_END: 109 for outgoing_sync_provider_cls in all_subclasses(OutgoingSyncProvider): 110 for provider in outgoing_sync_provider_cls.objects.all(): 111 provider.sync_dispatch() 112 113 114def ldap_sync_paginator( 115 task: Task, source: LDAPSource, sync: type[BaseLDAPSynchronizer] 116) -> list[Message]: 117 """Return a list of task signatures with LDAP pagination data""" 118 sync_inst: BaseLDAPSynchronizer = sync(source, task) 119 messages = [] 120 for page in sync_inst.get_objects(): 121 page_uid = str(uuid4()) 122 page_cache_key = CACHE_KEY_PREFIX + page_uid 123 cache.set(page_cache_key, page, 60 * 60 * CONFIG.get_int("ldap.task_timeout_hours")) 124 page_sync = ldap_sync_page.message_with_options( 125 args=(source.pk, class_to_path(sync), page_cache_key), 126 rel_obj=task.rel_obj, 127 uid=f"{source.slug}:{sync_inst.name()}:{page_uid}", 128 ) 129 messages.append(page_sync) 130 return messages 131 132 133@actor( 134 time_limit=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000, 135 description=_("Sync page for LDAP source."), 136) 137def ldap_sync_page(source_pk: str, sync_class: str, page_cache_key: str): 138 """Synchronization of an LDAP Source""" 139 self = CurrentTask.get_task() 140 source: LDAPSource = LDAPSource.objects.filter(pk=source_pk).first() 141 if not source: 142 # Because the source couldn't be found, we don't have a UID 143 # to set the state with 144 return 145 sync: type[BaseLDAPSynchronizer] = path_to_class(sync_class) 146 try: 147 sync_inst: BaseLDAPSynchronizer = sync(source, self) 148 page = cache.get(page_cache_key) 149 if not page: 150 error_message = ( 151 f"Could not find page in cache: {page_cache_key}. " 152 + "Try increasing ldap.task_timeout_hours" 153 ) 154 LOGGER.warning(error_message) 155 self.error(error_message) 156 return 157 cache.touch(page_cache_key) 158 if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.IMMEDIATE: 159 count = sync_inst.sync(page) 160 else: 161 with sync_outgoing_inhibit_dispatch(): 162 count = sync_inst.sync(page) 163 self.info(f"Synced {count} objects.") 164 cache.delete(page_cache_key) 165 except (LDAPException, StopSync) as exc: 166 # No explicit event is created here as .error will do that 167 LOGGER.warning("Failed to sync LDAP", exc=exc, source=source) 168 self.error(exc) 169 raise exc
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
SYNC_CLASSES: list[type[authentik.sources.ldap.sync.base.BaseLDAPSynchronizer]] =
[<class 'authentik.sources.ldap.sync.users.UserLDAPSynchronizer'>, <class 'authentik.sources.ldap.sync.groups.GroupLDAPSynchronizer'>, <class 'authentik.sources.ldap.sync.membership.MembershipLDAPSynchronizer'>]
CACHE_KEY_PREFIX =
'goauthentik.io/sources/ldap/page/'
CACHE_KEY_STATUS =
'goauthentik.io/sources/ldap/status/'
ldap_connectivity_check =
Actor(<function ldap_connectivity_check>, queue_name='default', actor_name='ldap_connectivity_check')
Check connectivity for LDAP Sources
ldap_sync =
Actor(<function ldap_sync>, queue_name='default', actor_name='ldap_sync')
Sync a single source
def
ldap_sync_paginator( task: authentik.tasks.models.Task, source: authentik.sources.ldap.models.LDAPSource, sync: type[authentik.sources.ldap.sync.base.BaseLDAPSynchronizer]) -> list[dramatiq.message.Message]:
115def ldap_sync_paginator( 116 task: Task, source: LDAPSource, sync: type[BaseLDAPSynchronizer] 117) -> list[Message]: 118 """Return a list of task signatures with LDAP pagination data""" 119 sync_inst: BaseLDAPSynchronizer = sync(source, task) 120 messages = [] 121 for page in sync_inst.get_objects(): 122 page_uid = str(uuid4()) 123 page_cache_key = CACHE_KEY_PREFIX + page_uid 124 cache.set(page_cache_key, page, 60 * 60 * CONFIG.get_int("ldap.task_timeout_hours")) 125 page_sync = ldap_sync_page.message_with_options( 126 args=(source.pk, class_to_path(sync), page_cache_key), 127 rel_obj=task.rel_obj, 128 uid=f"{source.slug}:{sync_inst.name()}:{page_uid}", 129 ) 130 messages.append(page_sync) 131 return messages
Return a list of task signatures with LDAP pagination data
ldap_sync_page =
Actor(<function ldap_sync_page>, queue_name='default', actor_name='ldap_sync_page')
Synchronization of an LDAP Source