authentik.events.apps

authentik events app

 1"""authentik events app"""
 2
 3from prometheus_client import Gauge, Histogram
 4
 5from authentik.blueprints.apps import ManagedAppConfig
 6from authentik.lib.config import CONFIG, ENV_PREFIX
 7from authentik.lib.utils.time import fqdn_rand
 8from authentik.tasks.schedules.common import ScheduleSpec
 9
10# TODO: Deprecated metric - remove in 2024.2 or later
11GAUGE_TASKS = Gauge(
12    "authentik_system_tasks",
13    "System tasks and their status",
14    ["tenant", "task_name", "task_uid", "status"],
15)
16
17SYSTEM_TASK_TIME = Histogram(
18    "authentik_system_tasks_time_seconds",
19    "Runtime of system tasks",
20    ["tenant", "task_name", "task_uid"],
21)
22SYSTEM_TASK_STATUS = Gauge(
23    "authentik_system_tasks_status",
24    "System task status",
25    ["tenant", "task_name", "task_uid", "status"],
26)
27
28
29class AuthentikEventsConfig(ManagedAppConfig):
30    """authentik events app"""
31
32    name = "authentik.events"
33    label = "authentik_events"
34    verbose_name = "authentik Events"
35    default = True
36
37    @property
38    def tenant_schedule_specs(self) -> list[ScheduleSpec]:
39        from authentik.events.tasks import notification_cleanup
40
41        return [
42            ScheduleSpec(
43                actor=notification_cleanup,
44                crontab=f"{fqdn_rand('notification_cleanup')} */8 * * *",
45            ),
46        ]
47
48    @ManagedAppConfig.reconcile_global
49    def check_deprecations(self):
50        """Check for config deprecations"""
51        from authentik.events.models import Event, EventAction
52
53        for key_replace, msg in CONFIG.deprecations.items():
54            key, replace = key_replace
55            key_env = f"{ENV_PREFIX}_{key.replace('.', '__')}".upper()
56            replace_env = f"{ENV_PREFIX}_{replace.replace('.', '__')}".upper()
57            if Event.objects.filter(
58                action=EventAction.CONFIGURATION_ERROR, context__deprecated_option=key
59            ).exists():
60                continue
61            Event.new(
62                EventAction.CONFIGURATION_ERROR,
63                deprecated_option=key,
64                deprecated_env=key_env,
65                replacement_option=replace,
66                replacement_env=replace_env,
67                message=msg,
68            ).save()
GAUGE_TASKS = prometheus_client.metrics.Gauge(authentik_system_tasks)
SYSTEM_TASK_TIME = prometheus_client.metrics.Histogram(authentik_system_tasks_time_seconds)
SYSTEM_TASK_STATUS = prometheus_client.metrics.Gauge(authentik_system_tasks_status)
class AuthentikEventsConfig(authentik.blueprints.apps.ManagedAppConfig):
30class AuthentikEventsConfig(ManagedAppConfig):
31    """authentik events app"""
32
33    name = "authentik.events"
34    label = "authentik_events"
35    verbose_name = "authentik Events"
36    default = True
37
38    @property
39    def tenant_schedule_specs(self) -> list[ScheduleSpec]:
40        from authentik.events.tasks import notification_cleanup
41
42        return [
43            ScheduleSpec(
44                actor=notification_cleanup,
45                crontab=f"{fqdn_rand('notification_cleanup')} */8 * * *",
46            ),
47        ]
48
49    @ManagedAppConfig.reconcile_global
50    def check_deprecations(self):
51        """Check for config deprecations"""
52        from authentik.events.models import Event, EventAction
53
54        for key_replace, msg in CONFIG.deprecations.items():
55            key, replace = key_replace
56            key_env = f"{ENV_PREFIX}_{key.replace('.', '__')}".upper()
57            replace_env = f"{ENV_PREFIX}_{replace.replace('.', '__')}".upper()
58            if Event.objects.filter(
59                action=EventAction.CONFIGURATION_ERROR, context__deprecated_option=key
60            ).exists():
61                continue
62            Event.new(
63                EventAction.CONFIGURATION_ERROR,
64                deprecated_option=key,
65                deprecated_env=key_env,
66                replacement_option=replace,
67                replacement_env=replace_env,
68                message=msg,
69            ).save()

authentik events app

name = 'authentik.events'
label = 'authentik_events'
verbose_name = 'authentik Events'
default = True
tenant_schedule_specs: list[authentik.tasks.schedules.common.ScheduleSpec]
38    @property
39    def tenant_schedule_specs(self) -> list[ScheduleSpec]:
40        from authentik.events.tasks import notification_cleanup
41
42        return [
43            ScheduleSpec(
44                actor=notification_cleanup,
45                crontab=f"{fqdn_rand('notification_cleanup')} */8 * * *",
46            ),
47        ]

Get a list of schedule specs that must exist in each tenant

@ManagedAppConfig.reconcile_global
def check_deprecations(self):
49    @ManagedAppConfig.reconcile_global
50    def check_deprecations(self):
51        """Check for config deprecations"""
52        from authentik.events.models import Event, EventAction
53
54        for key_replace, msg in CONFIG.deprecations.items():
55            key, replace = key_replace
56            key_env = f"{ENV_PREFIX}_{key.replace('.', '__')}".upper()
57            replace_env = f"{ENV_PREFIX}_{replace.replace('.', '__')}".upper()
58            if Event.objects.filter(
59                action=EventAction.CONFIGURATION_ERROR, context__deprecated_option=key
60            ).exists():
61                continue
62            Event.new(
63                EventAction.CONFIGURATION_ERROR,
64                deprecated_option=key,
65                deprecated_env=key_env,
66                replacement_option=replace,
67                replacement_env=replace_env,
68                message=msg,
69            ).save()

Check for config deprecations