authentik.endpoints.tasks
Endpoint tasks
1"""Endpoint tasks""" 2 3from typing import Any 4 5from django.utils.translation import gettext_lazy as _ 6from dramatiq.actor import actor 7from structlog.stdlib import get_logger 8 9from authentik.endpoints.controller import Capabilities 10from authentik.endpoints.models import Connector 11 12LOGGER = get_logger() 13 14 15@actor(description=_("Sync endpoints.")) 16def endpoints_sync(connector_pk: Any): 17 connector: Connector | None = ( 18 Connector.objects.filter(pk=connector_pk).select_subclasses().first() 19 ) 20 if not connector or not connector.enabled: 21 return 22 controller = connector.controller 23 ctrl = controller(connector) 24 if Capabilities.ENROLL_AUTOMATIC_API not in ctrl.capabilities(): 25 return 26 LOGGER.info("Syncing connector", connector=connector.name) 27 ctrl.sync_endpoints()
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>