authentik.sources.kerberos.management.commands.kerberos_sync
Kerberos Sync
1"""Kerberos Sync""" 2 3from structlog.stdlib import get_logger 4 5from authentik.sources.kerberos.models import KerberosSource 6from authentik.sources.kerberos.sync import KerberosSync 7from authentik.tenants.management import TenantCommand 8 9LOGGER = get_logger() 10 11 12class Command(TenantCommand): 13 """Run sync for an Kerberos Source""" 14 15 def add_arguments(self, parser): 16 parser.add_argument("source_slugs", nargs="+", type=str) 17 18 def handle_per_tenant(self, **options): 19 for source_slug in options["source_slugs"]: 20 source = KerberosSource.objects.filter(slug=source_slug).first() 21 if not source: 22 LOGGER.warning("Source does not exist", slug=source_slug) 23 continue 24 user_count = KerberosSync(source).sync() 25 LOGGER.info(f"Synced {user_count} users", slug=source_slug)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
13class Command(TenantCommand): 14 """Run sync for an Kerberos Source""" 15 16 def add_arguments(self, parser): 17 parser.add_argument("source_slugs", nargs="+", type=str) 18 19 def handle_per_tenant(self, **options): 20 for source_slug in options["source_slugs"]: 21 source = KerberosSource.objects.filter(slug=source_slug).first() 22 if not source: 23 LOGGER.warning("Source does not exist", slug=source_slug) 24 continue 25 user_count = KerberosSync(source).sync() 26 LOGGER.info(f"Synced {user_count} users", slug=source_slug)
Run sync for an Kerberos Source
def
handle_per_tenant(self, **options):
19 def handle_per_tenant(self, **options): 20 for source_slug in options["source_slugs"]: 21 source = KerberosSource.objects.filter(slug=source_slug).first() 22 if not source: 23 LOGGER.warning("Source does not exist", slug=source_slug) 24 continue 25 user_count = KerberosSync(source).sync() 26 LOGGER.info(f"Synced {user_count} users", slug=source_slug)
The actual logic of the command.