authentik.sources.kerberos.management.commands.kerberos_check_connection
Kerberos Connection check
1"""Kerberos Connection check""" 2 3from json import dumps 4 5from structlog.stdlib import get_logger 6 7from authentik.sources.kerberos.models import KerberosSource 8from authentik.tenants.management import TenantCommand 9 10LOGGER = get_logger() 11 12 13class Command(TenantCommand): 14 """Check connectivity to Kerberos servers for a source""" 15 16 def add_arguments(self, parser): 17 parser.add_argument("source_slugs", nargs="?", type=str) 18 19 def handle_per_tenant(self, **options): 20 sources = KerberosSource.objects.filter(enabled=True) 21 if options["source_slugs"]: 22 sources = KerberosSource.objects.filter(slug__in=options["source_slugs"]) 23 for source in sources.order_by("slug"): 24 status = source.check_connection() 25 self.stdout.write(dumps(status, indent=4))
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
14class Command(TenantCommand): 15 """Check connectivity to Kerberos servers for a source""" 16 17 def add_arguments(self, parser): 18 parser.add_argument("source_slugs", nargs="?", type=str) 19 20 def handle_per_tenant(self, **options): 21 sources = KerberosSource.objects.filter(enabled=True) 22 if options["source_slugs"]: 23 sources = KerberosSource.objects.filter(slug__in=options["source_slugs"]) 24 for source in sources.order_by("slug"): 25 status = source.check_connection() 26 self.stdout.write(dumps(status, indent=4))
Check connectivity to Kerberos servers for a source
def
handle_per_tenant(self, **options):
20 def handle_per_tenant(self, **options): 21 sources = KerberosSource.objects.filter(enabled=True) 22 if options["source_slugs"]: 23 sources = KerberosSource.objects.filter(slug__in=options["source_slugs"]) 24 for source in sources.order_by("slug"): 25 status = source.check_connection() 26 self.stdout.write(dumps(status, indent=4))
The actual logic of the command.