authentik.tasks.schedules.scheduler

 1import pglock
 2from django_dramatiq_postgres.scheduler import Scheduler as SchedulerBase
 3from structlog.stdlib import get_logger
 4
 5from authentik.tenants.models import Tenant
 6
 7LOGGER = get_logger()
 8
 9
10class Scheduler(SchedulerBase):
11    def _lock(self, tenant: Tenant) -> pglock.advisory:
12        return pglock.advisory(
13            lock_id=f"authentik.scheduler/{tenant.schema_name}",
14            side_effect=pglock.Return,
15            timeout=0,
16        )
17
18    def run(self):
19        for tenant in Tenant.objects.filter(ready=True):
20            with tenant:
21                with self._lock(tenant) as lock_acquired:
22                    if not lock_acquired:
23                        self.logger.debug("Could not acquire lock, skipping scheduling")
24                        return
25                    count = self._run()
26                    self.logger.info(f"Sent {count} scheduled tasks")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class Scheduler(django_dramatiq_postgres.scheduler.Scheduler):
11class Scheduler(SchedulerBase):
12    def _lock(self, tenant: Tenant) -> pglock.advisory:
13        return pglock.advisory(
14            lock_id=f"authentik.scheduler/{tenant.schema_name}",
15            side_effect=pglock.Return,
16            timeout=0,
17        )
18
19    def run(self):
20        for tenant in Tenant.objects.filter(ready=True):
21            with tenant:
22                with self._lock(tenant) as lock_acquired:
23                    if not lock_acquired:
24                        self.logger.debug("Could not acquire lock, skipping scheduling")
25                        return
26                    count = self._run()
27                    self.logger.info(f"Sent {count} scheduled tasks")
def run(self):
19    def run(self):
20        for tenant in Tenant.objects.filter(ready=True):
21            with tenant:
22                with self._lock(tenant) as lock_acquired:
23                    if not lock_acquired:
24                        self.logger.debug("Could not acquire lock, skipping scheduling")
25                        return
26                    count = self._run()
27                    self.logger.info(f"Sent {count} scheduled tasks")