authentik.tasks.api.workers
1import pglock 2from django.utils.timezone import now, timedelta 3from drf_spectacular.utils import extend_schema, inline_serializer 4from packaging.version import parse 5from rest_framework.fields import BooleanField, CharField 6from rest_framework.request import Request 7from rest_framework.response import Response 8from rest_framework.views import APIView 9 10from authentik import authentik_full_version 11from authentik.rbac.permissions import HasPermission 12from authentik.tasks.models import WorkerStatus 13 14 15class WorkerView(APIView): 16 """Get currently connected worker count.""" 17 18 permission_classes = [HasPermission("authentik_rbac.view_system_info")] 19 20 @extend_schema( 21 responses=inline_serializer( 22 "Worker", 23 fields={ 24 "worker_id": CharField(), 25 "version": CharField(), 26 "version_matching": BooleanField(), 27 }, 28 many=True, 29 ) 30 ) 31 def get(self, request: Request) -> Response: 32 response = [] 33 our_version = parse(authentik_full_version()) 34 for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)): 35 lock_id = f"goauthentik.io/worker/status/{status.pk}" 36 with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired: 37 # The worker doesn't hold the lock, it isn't running 38 if acquired: 39 continue 40 version_matching = parse(status.version) == our_version 41 response.append( 42 { 43 "worker_id": f"{status.pk}@{status.hostname}", 44 "version": status.version, 45 "version_matching": version_matching, 46 } 47 ) 48 return Response(response)
class
WorkerView(rest_framework.views.APIView):
16class WorkerView(APIView): 17 """Get currently connected worker count.""" 18 19 permission_classes = [HasPermission("authentik_rbac.view_system_info")] 20 21 @extend_schema( 22 responses=inline_serializer( 23 "Worker", 24 fields={ 25 "worker_id": CharField(), 26 "version": CharField(), 27 "version_matching": BooleanField(), 28 }, 29 many=True, 30 ) 31 ) 32 def get(self, request: Request) -> Response: 33 response = [] 34 our_version = parse(authentik_full_version()) 35 for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)): 36 lock_id = f"goauthentik.io/worker/status/{status.pk}" 37 with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired: 38 # The worker doesn't hold the lock, it isn't running 39 if acquired: 40 continue 41 version_matching = parse(status.version) == our_version 42 response.append( 43 { 44 "worker_id": f"{status.pk}@{status.hostname}", 45 "version": status.version, 46 "version_matching": version_matching, 47 } 48 ) 49 return Response(response)
Get currently connected worker count.
permission_classes =
[<class 'authentik.rbac.permissions.HasPermission.<locals>.checker'>]
@extend_schema(responses=inline_serializer('Worker', fields={'worker_id': CharField(), 'version': CharField(), 'version_matching': BooleanField()}, many=True))
def
get( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
21 @extend_schema( 22 responses=inline_serializer( 23 "Worker", 24 fields={ 25 "worker_id": CharField(), 26 "version": CharField(), 27 "version_matching": BooleanField(), 28 }, 29 many=True, 30 ) 31 ) 32 def get(self, request: Request) -> Response: 33 response = [] 34 our_version = parse(authentik_full_version()) 35 for status in WorkerStatus.objects.filter(last_seen__gt=now() - timedelta(minutes=2)): 36 lock_id = f"goauthentik.io/worker/status/{status.pk}" 37 with pglock.advisory(lock_id, timeout=0, side_effect=pglock.Return) as acquired: 38 # The worker doesn't hold the lock, it isn't running 39 if acquired: 40 continue 41 version_matching = parse(status.version) == our_version 42 response.append( 43 { 44 "worker_id": f"{status.pk}@{status.hostname}", 45 "version": status.version, 46 "version_matching": version_matching, 47 } 48 ) 49 return Response(response)