authentik.sources.kerberos.management.commands.kerberos_check_connection

Kerberos Connection check

 1"""Kerberos Connection check"""
 2
 3from json import dumps
 4
 5from structlog.stdlib import get_logger
 6
 7from authentik.sources.kerberos.models import KerberosSource
 8from authentik.tenants.management import TenantCommand
 9
10LOGGER = get_logger()
11
12
13class Command(TenantCommand):
14    """Check connectivity to Kerberos servers for a source"""
15
16    def add_arguments(self, parser):
17        parser.add_argument("source_slugs", nargs="?", type=str)
18
19    def handle_per_tenant(self, **options):
20        sources = KerberosSource.objects.filter(enabled=True)
21        if options["source_slugs"]:
22            sources = KerberosSource.objects.filter(slug__in=options["source_slugs"])
23        for source in sources.order_by("slug"):
24            status = source.check_connection()
25            self.stdout.write(dumps(status, indent=4))
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class Command(authentik.tenants.management.TenantCommand):
14class Command(TenantCommand):
15    """Check connectivity to Kerberos servers for a source"""
16
17    def add_arguments(self, parser):
18        parser.add_argument("source_slugs", nargs="?", type=str)
19
20    def handle_per_tenant(self, **options):
21        sources = KerberosSource.objects.filter(enabled=True)
22        if options["source_slugs"]:
23            sources = KerberosSource.objects.filter(slug__in=options["source_slugs"])
24        for source in sources.order_by("slug"):
25            status = source.check_connection()
26            self.stdout.write(dumps(status, indent=4))

Check connectivity to Kerberos servers for a source

def add_arguments(self, parser):
17    def add_arguments(self, parser):
18        parser.add_argument("source_slugs", nargs="?", type=str)

Entry point for subclassed commands to add custom arguments.

def handle_per_tenant(self, **options):
20    def handle_per_tenant(self, **options):
21        sources = KerberosSource.objects.filter(enabled=True)
22        if options["source_slugs"]:
23            sources = KerberosSource.objects.filter(slug__in=options["source_slugs"])
24        for source in sources.order_by("slug"):
25            status = source.check_connection()
26            self.stdout.write(dumps(status, indent=4))

The actual logic of the command.