authentik.tasks.apps
1from authentik.blueprints.apps import ManagedAppConfig 2from authentik.lib.utils.time import fqdn_rand 3from authentik.tasks.schedules.common import ScheduleSpec 4 5PRIORITY_HIGH = 1000 6 7 8class AuthentikTasksConfig(ManagedAppConfig): 9 name = "authentik.tasks" 10 label = "authentik_tasks" 11 verbose_name = "authentik Tasks" 12 default = True 13 14 @property 15 def global_schedule_specs(self) -> list[ScheduleSpec]: 16 from authentik.tasks.tasks import clean_worker_statuses 17 18 return [ 19 ScheduleSpec( 20 actor=clean_worker_statuses, 21 crontab=f"{fqdn_rand('clean_worker_statuses')} {fqdn_rand('clean_worker_statuses', 24)} * * *", # noqa: E501 22 ), 23 ]
PRIORITY_HIGH =
1000
9class AuthentikTasksConfig(ManagedAppConfig): 10 name = "authentik.tasks" 11 label = "authentik_tasks" 12 verbose_name = "authentik Tasks" 13 default = True 14 15 @property 16 def global_schedule_specs(self) -> list[ScheduleSpec]: 17 from authentik.tasks.tasks import clean_worker_statuses 18 19 return [ 20 ScheduleSpec( 21 actor=clean_worker_statuses, 22 crontab=f"{fqdn_rand('clean_worker_statuses')} {fqdn_rand('clean_worker_statuses', 24)} * * *", # noqa: E501 23 ), 24 ]
Basic reconciliation logic for apps
name =
'authentik.tasks'
global_schedule_specs: list[authentik.tasks.schedules.common.ScheduleSpec]
15 @property 16 def global_schedule_specs(self) -> list[ScheduleSpec]: 17 from authentik.tasks.tasks import clean_worker_statuses 18 19 return [ 20 ScheduleSpec( 21 actor=clean_worker_statuses, 22 crontab=f"{fqdn_rand('clean_worker_statuses')} {fqdn_rand('clean_worker_statuses', 24)} * * *", # noqa: E501 23 ), 24 ]
Get a list of schedule specs that must exist in the default tenant