authentik.sources.ldap.management.commands.ldap_sync
LDAP Sync
1"""LDAP Sync""" 2 3from structlog.stdlib import get_logger 4 5from authentik.sources.ldap.models import LDAPSource 6from authentik.tenants.management import TenantCommand 7 8LOGGER = get_logger() 9 10 11class Command(TenantCommand): 12 """Run sync for an LDAP Source""" 13 14 def add_arguments(self, parser): 15 parser.add_argument("source_slugs", nargs="+", type=str) 16 17 def handle_per_tenant(self, **options): 18 for source_slug in options["source_slugs"]: 19 source = LDAPSource.objects.filter(slug=source_slug).first() 20 if not source: 21 LOGGER.warning("Source does not exist", slug=source_slug) 22 continue 23 for schedule in source.schedules.all(): 24 schedule.send().get_result()
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
12class Command(TenantCommand): 13 """Run sync for an LDAP Source""" 14 15 def add_arguments(self, parser): 16 parser.add_argument("source_slugs", nargs="+", type=str) 17 18 def handle_per_tenant(self, **options): 19 for source_slug in options["source_slugs"]: 20 source = LDAPSource.objects.filter(slug=source_slug).first() 21 if not source: 22 LOGGER.warning("Source does not exist", slug=source_slug) 23 continue 24 for schedule in source.schedules.all(): 25 schedule.send().get_result()
Run sync for an LDAP Source
def
handle_per_tenant(self, **options):
18 def handle_per_tenant(self, **options): 19 for source_slug in options["source_slugs"]: 20 source = LDAPSource.objects.filter(slug=source_slug).first() 21 if not source: 22 LOGGER.warning("Source does not exist", slug=source_slug) 23 continue 24 for schedule in source.schedules.all(): 25 schedule.send().get_result()
The actual logic of the command.