authentik.tasks.signals

admin signals

 1"""admin signals"""
 2
 3from datetime import timedelta
 4
 5from django.db.models import Count
 6from django.dispatch import receiver
 7from django.utils.timezone import now
 8from django_dramatiq_postgres.models import TaskState
 9from packaging.version import parse
10from prometheus_client import Gauge
11
12from authentik import authentik_full_version
13from authentik.root.monitoring import monitoring_set
14from authentik.tasks.models import Task, WorkerStatus
15
16OLD_GAUGE_WORKERS = Gauge(
17    "authentik_admin_workers",
18    "Currently connected workers, their versions and if they are the same version as authentik",
19    ["version", "version_matched"],
20)
21GAUGE_WORKERS = Gauge(
22    "authentik_tasks_workers",
23    "Currently connected workers, their versions and if they are the same version as authentik",
24    ["version", "version_matched"],
25)
26GAUGE_TASKS_QUEUED = Gauge(
27    "authentik_tasks_queued",
28    "The number of tasks in queue.",
29    ["queue_name", "actor_name"],
30)
31
32
33@receiver(monitoring_set)
34def monitoring_set_workers(sender, **kwargs):
35    """Set worker gauge"""
36    worker_version_count = {}
37    our_version = parse(authentik_full_version())
38    for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(seconds=45)):
39        version_matching = parse(status.version) == our_version
40        worker_version_count.setdefault(status.version, {"count": 0, "matching": version_matching})
41        worker_version_count[status.version]["count"] += 1
42    for version, stats in worker_version_count.items():
43        OLD_GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
44        GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
45
46
47@receiver(monitoring_set)
48def monitoring_set_queued_tasks(sender, **kwargs):
49    """Set number of queued tasks"""
50    for stats in Task.objects.values("queue_name", "actor_name").distinct():
51        GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(0)
52    for stats in (
53        Task.objects.filter(state=TaskState.QUEUED)
54        .values("queue_name", "actor_name")
55        .annotate(count=Count("pk"))
56    ):
57        GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(stats["count"])
OLD_GAUGE_WORKERS = prometheus_client.metrics.Gauge(authentik_admin_workers)
GAUGE_WORKERS = prometheus_client.metrics.Gauge(authentik_tasks_workers)
GAUGE_TASKS_QUEUED = prometheus_client.metrics.Gauge(authentik_tasks_queued)
@receiver(monitoring_set)
def monitoring_set_workers(sender, **kwargs):
34@receiver(monitoring_set)
35def monitoring_set_workers(sender, **kwargs):
36    """Set worker gauge"""
37    worker_version_count = {}
38    our_version = parse(authentik_full_version())
39    for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(seconds=45)):
40        version_matching = parse(status.version) == our_version
41        worker_version_count.setdefault(status.version, {"count": 0, "matching": version_matching})
42        worker_version_count[status.version]["count"] += 1
43    for version, stats in worker_version_count.items():
44        OLD_GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
45        GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])

Set worker gauge

@receiver(monitoring_set)
def monitoring_set_queued_tasks(sender, **kwargs):
48@receiver(monitoring_set)
49def monitoring_set_queued_tasks(sender, **kwargs):
50    """Set number of queued tasks"""
51    for stats in Task.objects.values("queue_name", "actor_name").distinct():
52        GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(0)
53    for stats in (
54        Task.objects.filter(state=TaskState.QUEUED)
55        .values("queue_name", "actor_name")
56        .annotate(count=Count("pk"))
57    ):
58        GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(stats["count"])

Set number of queued tasks