authentik.sources.ldap.management.commands.ldap_sync

LDAP Sync

 1"""LDAP Sync"""
 2
 3from structlog.stdlib import get_logger
 4
 5from authentik.sources.ldap.models import LDAPSource
 6from authentik.tenants.management import TenantCommand
 7
 8LOGGER = get_logger()
 9
10
11class Command(TenantCommand):
12    """Run sync for an LDAP Source"""
13
14    def add_arguments(self, parser):
15        parser.add_argument("source_slugs", nargs="+", type=str)
16
17    def handle_per_tenant(self, **options):
18        for source_slug in options["source_slugs"]:
19            source = LDAPSource.objects.filter(slug=source_slug).first()
20            if not source:
21                LOGGER.warning("Source does not exist", slug=source_slug)
22                continue
23            for schedule in source.schedules.all():
24                schedule.send().get_result()
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class Command(authentik.tenants.management.TenantCommand):
12class Command(TenantCommand):
13    """Run sync for an LDAP Source"""
14
15    def add_arguments(self, parser):
16        parser.add_argument("source_slugs", nargs="+", type=str)
17
18    def handle_per_tenant(self, **options):
19        for source_slug in options["source_slugs"]:
20            source = LDAPSource.objects.filter(slug=source_slug).first()
21            if not source:
22                LOGGER.warning("Source does not exist", slug=source_slug)
23                continue
24            for schedule in source.schedules.all():
25                schedule.send().get_result()

Run sync for an LDAP Source

def add_arguments(self, parser):
15    def add_arguments(self, parser):
16        parser.add_argument("source_slugs", nargs="+", type=str)

Entry point for subclassed commands to add custom arguments.

def handle_per_tenant(self, **options):
18    def handle_per_tenant(self, **options):
19        for source_slug in options["source_slugs"]:
20            source = LDAPSource.objects.filter(slug=source_slug).first()
21            if not source:
22                LOGGER.warning("Source does not exist", slug=source_slug)
23                continue
24            for schedule in source.schedules.all():
25                schedule.send().get_result()

The actual logic of the command.