authentik.tasks.signals
admin signals
1"""admin signals""" 2 3from datetime import timedelta 4 5from django.db.models import Count 6from django.dispatch import receiver 7from django.utils.timezone import now 8from django_dramatiq_postgres.models import TaskState 9from packaging.version import parse 10from prometheus_client import Gauge 11 12from authentik import authentik_full_version 13from authentik.root.monitoring import monitoring_set 14from authentik.tasks.models import Task, WorkerStatus 15 16OLD_GAUGE_WORKERS = Gauge( 17 "authentik_admin_workers", 18 "Currently connected workers, their versions and if they are the same version as authentik", 19 ["version", "version_matched"], 20) 21GAUGE_WORKERS = Gauge( 22 "authentik_tasks_workers", 23 "Currently connected workers, their versions and if they are the same version as authentik", 24 ["version", "version_matched"], 25) 26GAUGE_TASKS_QUEUED = Gauge( 27 "authentik_tasks_queued", 28 "The number of tasks in queue.", 29 ["queue_name", "actor_name"], 30) 31 32 33@receiver(monitoring_set) 34def monitoring_set_workers(sender, **kwargs): 35 """Set worker gauge""" 36 worker_version_count = {} 37 our_version = parse(authentik_full_version()) 38 for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(seconds=45)): 39 version_matching = parse(status.version) == our_version 40 worker_version_count.setdefault(status.version, {"count": 0, "matching": version_matching}) 41 worker_version_count[status.version]["count"] += 1 42 for version, stats in worker_version_count.items(): 43 OLD_GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"]) 44 GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"]) 45 46 47@receiver(monitoring_set) 48def monitoring_set_queued_tasks(sender, **kwargs): 49 """Set number of queued tasks""" 50 for stats in Task.objects.values("queue_name", "actor_name").distinct(): 51 GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(0) 52 for stats in ( 53 Task.objects.filter(state=TaskState.QUEUED) 54 .values("queue_name", "actor_name") 55 .annotate(count=Count("pk")) 56 ): 57 GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(stats["count"])
OLD_GAUGE_WORKERS =
prometheus_client.metrics.Gauge(authentik_admin_workers)
GAUGE_WORKERS =
prometheus_client.metrics.Gauge(authentik_tasks_workers)
GAUGE_TASKS_QUEUED =
prometheus_client.metrics.Gauge(authentik_tasks_queued)
@receiver(monitoring_set)
def
monitoring_set_workers(sender, **kwargs):
34@receiver(monitoring_set) 35def monitoring_set_workers(sender, **kwargs): 36 """Set worker gauge""" 37 worker_version_count = {} 38 our_version = parse(authentik_full_version()) 39 for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(seconds=45)): 40 version_matching = parse(status.version) == our_version 41 worker_version_count.setdefault(status.version, {"count": 0, "matching": version_matching}) 42 worker_version_count[status.version]["count"] += 1 43 for version, stats in worker_version_count.items(): 44 OLD_GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"]) 45 GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
Set worker gauge
@receiver(monitoring_set)
def
monitoring_set_queued_tasks(sender, **kwargs):
48@receiver(monitoring_set) 49def monitoring_set_queued_tasks(sender, **kwargs): 50 """Set number of queued tasks""" 51 for stats in Task.objects.values("queue_name", "actor_name").distinct(): 52 GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(0) 53 for stats in ( 54 Task.objects.filter(state=TaskState.QUEUED) 55 .values("queue_name", "actor_name") 56 .annotate(count=Count("pk")) 57 ): 58 GAUGE_TASKS_QUEUED.labels(stats["queue_name"], stats["actor_name"]).set(stats["count"])
Set number of queued tasks