authentik.sources.kerberos.management.commands.kerberos_sync

Kerberos Sync

 1"""Kerberos Sync"""
 2
 3from structlog.stdlib import get_logger
 4
 5from authentik.sources.kerberos.models import KerberosSource
 6from authentik.sources.kerberos.sync import KerberosSync
 7from authentik.tenants.management import TenantCommand
 8
 9LOGGER = get_logger()
10
11
12class Command(TenantCommand):
13    """Run sync for an Kerberos Source"""
14
15    def add_arguments(self, parser):
16        parser.add_argument("source_slugs", nargs="+", type=str)
17
18    def handle_per_tenant(self, **options):
19        for source_slug in options["source_slugs"]:
20            source = KerberosSource.objects.filter(slug=source_slug).first()
21            if not source:
22                LOGGER.warning("Source does not exist", slug=source_slug)
23                continue
24            user_count = KerberosSync(source).sync()
25            LOGGER.info(f"Synced {user_count} users", slug=source_slug)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class Command(authentik.tenants.management.TenantCommand):
13class Command(TenantCommand):
14    """Run sync for an Kerberos Source"""
15
16    def add_arguments(self, parser):
17        parser.add_argument("source_slugs", nargs="+", type=str)
18
19    def handle_per_tenant(self, **options):
20        for source_slug in options["source_slugs"]:
21            source = KerberosSource.objects.filter(slug=source_slug).first()
22            if not source:
23                LOGGER.warning("Source does not exist", slug=source_slug)
24                continue
25            user_count = KerberosSync(source).sync()
26            LOGGER.info(f"Synced {user_count} users", slug=source_slug)

Run sync for an Kerberos Source

def add_arguments(self, parser):
16    def add_arguments(self, parser):
17        parser.add_argument("source_slugs", nargs="+", type=str)

Entry point for subclassed commands to add custom arguments.

def handle_per_tenant(self, **options):
19    def handle_per_tenant(self, **options):
20        for source_slug in options["source_slugs"]:
21            source = KerberosSource.objects.filter(slug=source_slug).first()
22            if not source:
23                LOGGER.warning("Source does not exist", slug=source_slug)
24                continue
25            user_count = KerberosSync(source).sync()
26            LOGGER.info(f"Synced {user_count} users", slug=source_slug)

The actual logic of the command.