authentik.endpoints.tasks

Endpoint tasks

 1"""Endpoint tasks"""
 2
 3from typing import Any
 4
 5from django.utils.translation import gettext_lazy as _
 6from dramatiq.actor import actor
 7from structlog.stdlib import get_logger
 8
 9from authentik.endpoints.controller import Capabilities
10from authentik.endpoints.models import Connector
11
12LOGGER = get_logger()
13
14
15@actor(description=_("Sync endpoints."))
16def endpoints_sync(connector_pk: Any):
17    connector: Connector | None = (
18        Connector.objects.filter(pk=connector_pk).select_subclasses().first()
19    )
20    if not connector or not connector.enabled:
21        return
22    controller = connector.controller
23    ctrl = controller(connector)
24    if Capabilities.ENROLL_AUTOMATIC_API not in ctrl.capabilities():
25        return
26    LOGGER.info("Syncing connector", connector=connector.name)
27    ctrl.sync_endpoints()
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>