authentik.tasks.schedules.apps

 1from authentik.blueprints.apps import ManagedAppConfig
 2from authentik.lib.utils.reflection import get_apps
 3from authentik.tasks.schedules.common import ScheduleSpec
 4
 5
 6class AuthentikTasksSchedulesConfig(ManagedAppConfig):
 7    name = "authentik.tasks.schedules"
 8    label = "authentik_tasks_schedules"
 9    verbose_name = "authentik Tasks Schedules"
10    default = True
11
12    @property
13    def tenant_schedule_specs(self) -> list[ScheduleSpec]:
14        from authentik.tasks.schedules.models import ScheduledModel
15
16        schedules = []
17        for Model in ScheduledModel.models():
18            for obj in Model.objects.all():
19                for spec in obj.schedule_specs:
20                    spec.rel_obj = obj
21                    spec.identifier = obj.pk
22                    schedules.append(spec)
23        return schedules
24
25    def _reconcile_schedules(self, specs: list[ScheduleSpec]):
26        from django.db import transaction
27
28        from authentik.tasks.schedules.models import Schedule
29
30        schedules_to_send = []
31        with transaction.atomic():
32            pks_to_keep = []
33            for spec in specs:
34                schedule = spec.update_or_create()
35                pks_to_keep.append(schedule.pk)
36                if spec.send_on_startup:
37                    schedules_to_send.append(schedule)
38            Schedule.objects.exclude(pk__in=pks_to_keep).delete()
39        for schedule in schedules_to_send:
40            schedule.send()
41
42    @ManagedAppConfig.reconcile_tenant
43    def reconcile_tenant_schedules(self):
44        from authentik.tenants.utils import get_current_tenant, get_public_schema_name
45
46        schedule_specs = []
47        for app in get_apps():
48            schedule_specs.extend(app.tenant_schedule_specs)
49            if get_current_tenant().schema_name == get_public_schema_name():
50                schedule_specs.extend(app.global_schedule_specs)
51        self._reconcile_schedules(schedule_specs)
class AuthentikTasksSchedulesConfig(authentik.blueprints.apps.ManagedAppConfig):
 7class AuthentikTasksSchedulesConfig(ManagedAppConfig):
 8    name = "authentik.tasks.schedules"
 9    label = "authentik_tasks_schedules"
10    verbose_name = "authentik Tasks Schedules"
11    default = True
12
13    @property
14    def tenant_schedule_specs(self) -> list[ScheduleSpec]:
15        from authentik.tasks.schedules.models import ScheduledModel
16
17        schedules = []
18        for Model in ScheduledModel.models():
19            for obj in Model.objects.all():
20                for spec in obj.schedule_specs:
21                    spec.rel_obj = obj
22                    spec.identifier = obj.pk
23                    schedules.append(spec)
24        return schedules
25
26    def _reconcile_schedules(self, specs: list[ScheduleSpec]):
27        from django.db import transaction
28
29        from authentik.tasks.schedules.models import Schedule
30
31        schedules_to_send = []
32        with transaction.atomic():
33            pks_to_keep = []
34            for spec in specs:
35                schedule = spec.update_or_create()
36                pks_to_keep.append(schedule.pk)
37                if spec.send_on_startup:
38                    schedules_to_send.append(schedule)
39            Schedule.objects.exclude(pk__in=pks_to_keep).delete()
40        for schedule in schedules_to_send:
41            schedule.send()
42
43    @ManagedAppConfig.reconcile_tenant
44    def reconcile_tenant_schedules(self):
45        from authentik.tenants.utils import get_current_tenant, get_public_schema_name
46
47        schedule_specs = []
48        for app in get_apps():
49            schedule_specs.extend(app.tenant_schedule_specs)
50            if get_current_tenant().schema_name == get_public_schema_name():
51                schedule_specs.extend(app.global_schedule_specs)
52        self._reconcile_schedules(schedule_specs)

Basic reconciliation logic for apps

label = 'authentik_tasks_schedules'
verbose_name = 'authentik Tasks Schedules'
default = True
tenant_schedule_specs: list[authentik.tasks.schedules.common.ScheduleSpec]
13    @property
14    def tenant_schedule_specs(self) -> list[ScheduleSpec]:
15        from authentik.tasks.schedules.models import ScheduledModel
16
17        schedules = []
18        for Model in ScheduledModel.models():
19            for obj in Model.objects.all():
20                for spec in obj.schedule_specs:
21                    spec.rel_obj = obj
22                    spec.identifier = obj.pk
23                    schedules.append(spec)
24        return schedules

Get a list of schedule specs that must exist in each tenant

@ManagedAppConfig.reconcile_tenant
def reconcile_tenant_schedules(self):
43    @ManagedAppConfig.reconcile_tenant
44    def reconcile_tenant_schedules(self):
45        from authentik.tenants.utils import get_current_tenant, get_public_schema_name
46
47        schedule_specs = []
48        for app in get_apps():
49            schedule_specs.extend(app.tenant_schedule_specs)
50            if get_current_tenant().schema_name == get_public_schema_name():
51                schedule_specs.extend(app.global_schedule_specs)
52        self._reconcile_schedules(schedule_specs)