authentik.providers.scim.management.commands.scim_sync
SCIM Sync
1"""SCIM Sync""" 2 3from structlog.stdlib import get_logger 4 5from authentik.providers.scim.models import SCIMProvider 6from authentik.tenants.management import TenantCommand 7 8LOGGER = get_logger() 9 10 11class Command(TenantCommand): 12 """Run sync for an SCIM Provider""" 13 14 def add_arguments(self, parser): 15 parser.add_argument("providers", nargs="+", type=str) 16 17 def handle_per_tenant(self, **options): 18 for provider_name in options["providers"]: 19 provider = SCIMProvider.objects.filter(name=provider_name).first() 20 if not provider: 21 LOGGER.warning("Provider does not exist", name=provider_name) 22 continue 23 for schedule in provider.schedules.all(): 24 schedule.send().get_result()
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
12class Command(TenantCommand): 13 """Run sync for an SCIM Provider""" 14 15 def add_arguments(self, parser): 16 parser.add_argument("providers", nargs="+", type=str) 17 18 def handle_per_tenant(self, **options): 19 for provider_name in options["providers"]: 20 provider = SCIMProvider.objects.filter(name=provider_name).first() 21 if not provider: 22 LOGGER.warning("Provider does not exist", name=provider_name) 23 continue 24 for schedule in provider.schedules.all(): 25 schedule.send().get_result()
Run sync for an SCIM Provider
def
handle_per_tenant(self, **options):
18 def handle_per_tenant(self, **options): 19 for provider_name in options["providers"]: 20 provider = SCIMProvider.objects.filter(name=provider_name).first() 21 if not provider: 22 LOGGER.warning("Provider does not exist", name=provider_name) 23 continue 24 for schedule in provider.schedules.all(): 25 schedule.send().get_result()
The actual logic of the command.