authentik.sources.ldap.management.commands.ldap_check_connection

LDAP Connection check

 1"""LDAP Connection check"""
 2
 3from json import dumps
 4
 5from structlog.stdlib import get_logger
 6
 7from authentik.sources.ldap.models import LDAPSource
 8from authentik.tenants.management import TenantCommand
 9
10LOGGER = get_logger()
11
12
13class Command(TenantCommand):
14    """Check connectivity to LDAP servers for a source"""
15
16    def add_arguments(self, parser):
17        parser.add_argument("source_slugs", nargs="?", type=str)
18
19    def handle_per_tenant(self, **options):
20        sources = LDAPSource.objects.filter(enabled=True)
21        if options["source_slugs"]:
22            sources = LDAPSource.objects.filter(slug__in=options["source_slugs"])
23        for source in sources.order_by("slug"):
24            status = source.check_connection()
25            self.stdout.write(dumps(status, indent=4))
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class Command(authentik.tenants.management.TenantCommand):
14class Command(TenantCommand):
15    """Check connectivity to LDAP servers for a source"""
16
17    def add_arguments(self, parser):
18        parser.add_argument("source_slugs", nargs="?", type=str)
19
20    def handle_per_tenant(self, **options):
21        sources = LDAPSource.objects.filter(enabled=True)
22        if options["source_slugs"]:
23            sources = LDAPSource.objects.filter(slug__in=options["source_slugs"])
24        for source in sources.order_by("slug"):
25            status = source.check_connection()
26            self.stdout.write(dumps(status, indent=4))

Check connectivity to LDAP servers for a source

def add_arguments(self, parser):
17    def add_arguments(self, parser):
18        parser.add_argument("source_slugs", nargs="?", type=str)

Entry point for subclassed commands to add custom arguments.

def handle_per_tenant(self, **options):
20    def handle_per_tenant(self, **options):
21        sources = LDAPSource.objects.filter(enabled=True)
22        if options["source_slugs"]:
23            sources = LDAPSource.objects.filter(slug__in=options["source_slugs"])
24        for source in sources.order_by("slug"):
25            status = source.check_connection()
26            self.stdout.write(dumps(status, indent=4))

The actual logic of the command.