authentik.tasks.api.workers

 1import pglock
 2from django.utils.timezone import now, timedelta
 3from drf_spectacular.utils import extend_schema, inline_serializer
 4from packaging.version import parse
 5from rest_framework.fields import BooleanField, CharField
 6from rest_framework.request import Request
 7from rest_framework.response import Response
 8from rest_framework.views import APIView
 9
10from authentik import authentik_full_version
11from authentik.rbac.permissions import HasPermission
12from authentik.tasks.models import WorkerStatus
13
14
15class WorkerView(APIView):
16    """Get currently connected worker count."""
17
18    permission_classes = [HasPermission("authentik_rbac.view_system_info")]
19
20    @extend_schema(
21        responses=inline_serializer(
22            "Worker",
23            fields={
24                "worker_id": CharField(),
25                "version": CharField(),
26                "version_matching": BooleanField(),
27            },
28            many=True,
29        )
30    )
31    def get(self, request: Request) -> Response:
32        response = []
33        our_version = parse(authentik_full_version())
34        for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)):
35            lock_id = f"goauthentik.io/worker/status/{status.pk}"
36            with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired:
37                # The worker doesn't hold the lock, it isn't running
38                if acquired:
39                    continue
40                version_matching = parse(status.version) == our_version
41                response.append(
42                    {
43                        "worker_id": f"{status.pk}@{status.hostname}",
44                        "version": status.version,
45                        "version_matching": version_matching,
46                    }
47                )
48        return Response(response)
class WorkerView(rest_framework.views.APIView):
16class WorkerView(APIView):
17    """Get currently connected worker count."""
18
19    permission_classes = [HasPermission("authentik_rbac.view_system_info")]
20
21    @extend_schema(
22        responses=inline_serializer(
23            "Worker",
24            fields={
25                "worker_id": CharField(),
26                "version": CharField(),
27                "version_matching": BooleanField(),
28            },
29            many=True,
30        )
31    )
32    def get(self, request: Request) -> Response:
33        response = []
34        our_version = parse(authentik_full_version())
35        for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)):
36            lock_id = f"goauthentik.io/worker/status/{status.pk}"
37            with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired:
38                # The worker doesn't hold the lock, it isn't running
39                if acquired:
40                    continue
41                version_matching = parse(status.version) == our_version
42                response.append(
43                    {
44                        "worker_id": f"{status.pk}@{status.hostname}",
45                        "version": status.version,
46                        "version_matching": version_matching,
47                    }
48                )
49        return Response(response)

Get currently connected worker count.

permission_classes = [<class 'authentik.rbac.permissions.HasPermission.<locals>.checker'>]
@extend_schema(responses=inline_serializer('Worker', fields={'worker_id': CharField(), 'version': CharField(), 'version_matching': BooleanField()}, many=True))
def get( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
21    @extend_schema(
22        responses=inline_serializer(
23            "Worker",
24            fields={
25                "worker_id": CharField(),
26                "version": CharField(),
27                "version_matching": BooleanField(),
28            },
29            many=True,
30        )
31    )
32    def get(self, request: Request) -> Response:
33        response = []
34        our_version = parse(authentik_full_version())
35        for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)):
36            lock_id = f"goauthentik.io/worker/status/{status.pk}"
37            with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired:
38                # The worker doesn't hold the lock, it isn't running
39                if acquired:
40                    continue
41                version_matching = parse(status.version) == our_version
42                response.append(
43                    {
44                        "worker_id": f"{status.pk}@{status.hostname}",
45                        "version": status.version,
46                        "version_matching": version_matching,
47                    }
48                )
49        return Response(response)