authentik.tasks.signals

admin signals

 1"""admin signals"""
 2
 3from datetime import timedelta
 4
 5import pglock
 6from django.db.models import Count
 7from django.dispatch import receiver
 8from django.utils.timezone import now
 9from django_dramatiq_postgres.models import TaskState
10from packaging.version import parse
11from prometheus_client import Gauge
12
13from authentik import authentik_full_version
14from authentik.root.monitoring import monitoring_set
15from authentik.tasks.models import Task, WorkerStatus
16
17OLD_GAUGE_WORKERS = Gauge(
18    "authentik_admin_workers",
19    "Currently connected workers, their versions and if they are the same version as authentik",
20    ["version", "version_matched"],
21)
22GAUGE_WORKERS = Gauge(
23    "authentik_tasks_workers",
24    "Currently connected workers, their versions and if they are the same version as authentik",
25    ["version", "version_matched"],
26)
27GAUGE_TASKS_QUEUED = Gauge(
28    "authentik_tasks_queued",
29    "The number of tasks in queue.",
30    ["queue_name", "actor_name"],
31)
32
33
34_version = parse(authentik_full_version())
35
36
37@receiver(monitoring_set)
38def monitoring_set_workers(sender, **kwargs):
39    """Set worker gauge"""
40    worker_version_count = {}
41    for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)):
42        lock_id = f"goauthentik.io/worker/status/{status.pk}"
43        with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired:
44            # The worker doesn't hold the lock, it isn't running
45            if acquired:
46                continue
47            version_matching = parse(status.version) == _version
48            worker_version_count.setdefault(
49                status.version, {"count": 0, "matching": version_matching}
50            )
51            worker_version_count[status.version]["count"] += 1
52    for version, stats in worker_version_count.items():
53        OLD_GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
54        GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
55
56
57@receiver(monitoring_set)
58def monitoring_set_queued_tasks(sender, **kwargs):
59    """Set number of queued tasks"""
60    for stats in Task.objects.values("queue_name", "actor_name").distinct():
61        GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(0)
62    for stats in (
63        Task.objects.filter(state=TaskState.QUEUED)
64        .values("queue_name", "actor_name")
65        .annotate(count=Count("pk"))
66    ):
67        GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(stats["count"])
OLD_GAUGE_WORKERS = prometheus_client.metrics.Gauge(authentik_admin_workers)
GAUGE_WORKERS = prometheus_client.metrics.Gauge(authentik_tasks_workers)
GAUGE_TASKS_QUEUED = prometheus_client.metrics.Gauge(authentik_tasks_queued)
@receiver(monitoring_set)
def monitoring_set_workers(sender, **kwargs):
38@receiver(monitoring_set)
39def monitoring_set_workers(sender, **kwargs):
40    """Set worker gauge"""
41    worker_version_count = {}
42    for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)):
43        lock_id = f"goauthentik.io/worker/status/{status.pk}"
44        with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired:
45            # The worker doesn't hold the lock, it isn't running
46            if acquired:
47                continue
48            version_matching = parse(status.version) == _version
49            worker_version_count.setdefault(
50                status.version, {"count": 0, "matching": version_matching}
51            )
52            worker_version_count[status.version]["count"] += 1
53    for version, stats in worker_version_count.items():
54        OLD_GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
55        GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])

Set worker gauge

@receiver(monitoring_set)
def monitoring_set_queued_tasks(sender, **kwargs):
58@receiver(monitoring_set)
59def monitoring_set_queued_tasks(sender, **kwargs):
60    """Set number of queued tasks"""
61    for stats in Task.objects.values("queue_name", "actor_name").distinct():
62        GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(0)
63    for stats in (
64        Task.objects.filter(state=TaskState.QUEUED)
65        .values("queue_name", "actor_name")
66        .annotate(count=Count("pk"))
67    ):
68        GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(stats["count"])

Set number of queued tasks