authentik.tasks.signals
admin signals
1"""admin signals""" 2 3from datetime import timedelta 4 5import pglock 6from django.db.models import Count 7from django.dispatch import receiver 8from django.utils.timezone import now 9from django_dramatiq_postgres.models import TaskState 10from packaging.version import parse 11from prometheus_client import Gauge 12 13from authentik import authentik_full_version 14from authentik.root.monitoring import monitoring_set 15from authentik.tasks.models import Task, WorkerStatus 16 17OLD_GAUGE_WORKERS = Gauge( 18 "authentik_admin_workers", 19 "Currently connected workers, their versions and if they are the same version as authentik", 20 ["version", "version_matched"], 21) 22GAUGE_WORKERS = Gauge( 23 "authentik_tasks_workers", 24 "Currently connected workers, their versions and if they are the same version as authentik", 25 ["version", "version_matched"], 26) 27GAUGE_TASKS_QUEUED = Gauge( 28 "authentik_tasks_queued", 29 "The number of tasks in queue.", 30 ["queue_name", "actor_name"], 31) 32 33 34_version = parse(authentik_full_version()) 35 36 37@receiver(monitoring_set) 38def monitoring_set_workers(sender, **kwargs): 39 """Set worker gauge""" 40 worker_version_count = {} 41 for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)): 42 lock_id = f"goauthentik.io/worker/status/{status.pk}" 43 with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired: 44 # The worker doesn't hold the lock, it isn't running 45 if acquired: 46 continue 47 version_matching = parse(status.version) == _version 48 worker_version_count.setdefault( 49 status.version, {"count": 0, "matching": version_matching} 50 ) 51 worker_version_count[status.version]["count"] += 1 52 for version, stats in worker_version_count.items(): 53 OLD_GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"]) 54 GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"]) 55 56 57@receiver(monitoring_set) 58def monitoring_set_queued_tasks(sender, **kwargs): 59 """Set number of queued tasks""" 60 for stats in Task.objects.values("queue_name", "actor_name").distinct(): 61 GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(0) 62 for stats in ( 63 Task.objects.filter(state=TaskState.QUEUED) 64 .values("queue_name", "actor_name") 65 .annotate(count=Count("pk")) 66 ): 67 GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(stats["count"])
OLD_GAUGE_WORKERS =
prometheus_client.metrics.Gauge(authentik_admin_workers)
GAUGE_WORKERS =
prometheus_client.metrics.Gauge(authentik_tasks_workers)
GAUGE_TASKS_QUEUED =
prometheus_client.metrics.Gauge(authentik_tasks_queued)
@receiver(monitoring_set)
def
monitoring_set_workers(sender, **kwargs):
38@receiver(monitoring_set) 39def monitoring_set_workers(sender, **kwargs): 40 """Set worker gauge""" 41 worker_version_count = {} 42 for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)): 43 lock_id = f"goauthentik.io/worker/status/{status.pk}" 44 with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired: 45 # The worker doesn't hold the lock, it isn't running 46 if acquired: 47 continue 48 version_matching = parse(status.version) == _version 49 worker_version_count.setdefault( 50 status.version, {"count": 0, "matching": version_matching} 51 ) 52 worker_version_count[status.version]["count"] += 1 53 for version, stats in worker_version_count.items(): 54 OLD_GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"]) 55 GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
Set worker gauge
@receiver(monitoring_set)
def
monitoring_set_queued_tasks(sender, **kwargs):
58@receiver(monitoring_set) 59def monitoring_set_queued_tasks(sender, **kwargs): 60 """Set number of queued tasks""" 61 for stats in Task.objects.values("queue_name", "actor_name").distinct(): 62 GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(0) 63 for stats in ( 64 Task.objects.filter(state=TaskState.QUEUED) 65 .values("queue_name", "actor_name") 66 .annotate(count=Count("pk")) 67 ): 68 GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(stats["count"])
Set number of queued tasks