authentik.sources.kerberos.api.source

Source API Views

  1"""Source API Views"""
  2
  3from django.core.cache import cache
  4from drf_spectacular.utils import extend_schema
  5from rest_framework.decorators import action
  6from rest_framework.fields import SerializerMethodField
  7from rest_framework.request import Request
  8from rest_framework.response import Response
  9from rest_framework.viewsets import ModelViewSet
 10
 11from authentik.core.api.sources import SourceSerializer
 12from authentik.core.api.used_by import UsedByMixin
 13from authentik.lib.sync.api import SyncStatusSerializer
 14from authentik.rbac.filters import ObjectFilter
 15from authentik.sources.kerberos.models import KerberosSource
 16from authentik.sources.kerberos.tasks import CACHE_KEY_STATUS, kerberos_sync
 17from authentik.tasks.models import Task, TaskStatus
 18
 19
 20class KerberosSourceSerializer(SourceSerializer):
 21    """Kerberos Source Serializer"""
 22
 23    connectivity = SerializerMethodField()
 24
 25    def get_connectivity(self, source: KerberosSource) -> dict[str, str] | None:
 26        """Get cached source connectivity"""
 27        return cache.get(CACHE_KEY_STATUS + source.slug, None)
 28
 29    class Meta:
 30        model = KerberosSource
 31        fields = SourceSerializer.Meta.fields + [
 32            "group_matching_mode",
 33            "realm",
 34            "krb5_conf",
 35            "kadmin_type",
 36            "sync_users",
 37            "sync_users_password",
 38            "sync_principal",
 39            "sync_password",
 40            "sync_keytab",
 41            "sync_ccache",
 42            "connectivity",
 43            "spnego_server_name",
 44            "spnego_keytab",
 45            "spnego_ccache",
 46            "password_login_update_internal_password",
 47            "sync_outgoing_trigger_mode",
 48        ]
 49        extra_kwargs = {
 50            "sync_password": {"write_only": True},
 51            "sync_keytab": {"write_only": True},
 52            "spnego_keytab": {"write_only": True},
 53        }
 54
 55
 56class KerberosSourceViewSet(UsedByMixin, ModelViewSet):
 57    """Kerberos Source Viewset"""
 58
 59    queryset = KerberosSource.objects.all()
 60    serializer_class = KerberosSourceSerializer
 61    lookup_field = "slug"
 62    filterset_fields = [
 63        "pbm_uuid",
 64        "name",
 65        "slug",
 66        "enabled",
 67        "realm",
 68        "kadmin_type",
 69        "sync_users",
 70        "sync_users_password",
 71        "sync_principal",
 72        "spnego_server_name",
 73        "password_login_update_internal_password",
 74    ]
 75    search_fields = [
 76        "name",
 77        "slug",
 78        "realm",
 79        "krb5_conf",
 80        "sync_principal",
 81        "spnego_server_name",
 82    ]
 83    ordering = ["name"]
 84
 85    @extend_schema(responses={200: SyncStatusSerializer()})
 86    @action(
 87        methods=["GET"],
 88        detail=True,
 89        pagination_class=None,
 90        url_path="sync/status",
 91        filter_backends=[ObjectFilter],
 92    )
 93    def sync_status(self, request: Request, slug: str) -> Response:
 94        """Get provider's sync status"""
 95        source: KerberosSource = self.get_object()
 96
 97        status = {}
 98
 99        with source.sync_lock as lock_acquired:
100            # If we could not acquire the lock, it means a task is using it, and thus is running
101            status["is_running"] = not lock_acquired
102
103        sync_schedule = None
104        for schedule in source.schedules.all():
105            if schedule.actor_name == kerberos_sync.actor_name:
106                sync_schedule = schedule
107
108        if not sync_schedule:
109            return Response(SyncStatusSerializer(status).data)
110
111        last_task: Task = (
112            sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED))
113            .order_by("-mtime")
114            .first()
115        )
116        last_successful_task: Task = (
117            sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO))
118            .order_by("-mtime")
119            .first()
120        )
121
122        if last_task:
123            status["last_sync_status"] = last_task.aggregated_status
124        if last_successful_task:
125            status["last_successful_sync"] = last_successful_task.mtime
126
127        return Response(SyncStatusSerializer(status).data)
class KerberosSourceSerializer(authentik.core.api.sources.SourceSerializer):
21class KerberosSourceSerializer(SourceSerializer):
22    """Kerberos Source Serializer"""
23
24    connectivity = SerializerMethodField()
25
26    def get_connectivity(self, source: KerberosSource) -> dict[str, str] | None:
27        """Get cached source connectivity"""
28        return cache.get(CACHE_KEY_STATUS + source.slug, None)
29
30    class Meta:
31        model = KerberosSource
32        fields = SourceSerializer.Meta.fields + [
33            "group_matching_mode",
34            "realm",
35            "krb5_conf",
36            "kadmin_type",
37            "sync_users",
38            "sync_users_password",
39            "sync_principal",
40            "sync_password",
41            "sync_keytab",
42            "sync_ccache",
43            "connectivity",
44            "spnego_server_name",
45            "spnego_keytab",
46            "spnego_ccache",
47            "password_login_update_internal_password",
48            "sync_outgoing_trigger_mode",
49        ]
50        extra_kwargs = {
51            "sync_password": {"write_only": True},
52            "sync_keytab": {"write_only": True},
53            "spnego_keytab": {"write_only": True},
54        }

Kerberos Source Serializer

connectivity
def get_connectivity( self, source: authentik.sources.kerberos.models.KerberosSource) -> dict[str, str] | None:
26    def get_connectivity(self, source: KerberosSource) -> dict[str, str] | None:
27        """Get cached source connectivity"""
28        return cache.get(CACHE_KEY_STATUS + source.slug, None)

Get cached source connectivity

class KerberosSourceSerializer.Meta:
30    class Meta:
31        model = KerberosSource
32        fields = SourceSerializer.Meta.fields + [
33            "group_matching_mode",
34            "realm",
35            "krb5_conf",
36            "kadmin_type",
37            "sync_users",
38            "sync_users_password",
39            "sync_principal",
40            "sync_password",
41            "sync_keytab",
42            "sync_ccache",
43            "connectivity",
44            "spnego_server_name",
45            "spnego_keytab",
46            "spnego_ccache",
47            "password_login_update_internal_password",
48            "sync_outgoing_trigger_mode",
49        ]
50        extra_kwargs = {
51            "sync_password": {"write_only": True},
52            "sync_keytab": {"write_only": True},
53            "spnego_keytab": {"write_only": True},
54        }
fields = ['pk', 'name', 'slug', 'enabled', 'promoted', 'authentication_flow', 'enrollment_flow', 'user_property_mappings', 'group_property_mappings', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name', 'policy_engine_mode', 'user_matching_mode', 'managed', 'user_path_template', 'icon', 'icon_url', 'icon_themed_urls', 'group_matching_mode', 'realm', 'krb5_conf', 'kadmin_type', 'sync_users', 'sync_users_password', 'sync_principal', 'sync_password', 'sync_keytab', 'sync_ccache', 'connectivity', 'spnego_server_name', 'spnego_keytab', 'spnego_ccache', 'password_login_update_internal_password', 'sync_outgoing_trigger_mode']
extra_kwargs = {'sync_password': {'write_only': True}, 'sync_keytab': {'write_only': True}, 'spnego_keytab': {'write_only': True}}
class KerberosSourceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
 57class KerberosSourceViewSet(UsedByMixin, ModelViewSet):
 58    """Kerberos Source Viewset"""
 59
 60    queryset = KerberosSource.objects.all()
 61    serializer_class = KerberosSourceSerializer
 62    lookup_field = "slug"
 63    filterset_fields = [
 64        "pbm_uuid",
 65        "name",
 66        "slug",
 67        "enabled",
 68        "realm",
 69        "kadmin_type",
 70        "sync_users",
 71        "sync_users_password",
 72        "sync_principal",
 73        "spnego_server_name",
 74        "password_login_update_internal_password",
 75    ]
 76    search_fields = [
 77        "name",
 78        "slug",
 79        "realm",
 80        "krb5_conf",
 81        "sync_principal",
 82        "spnego_server_name",
 83    ]
 84    ordering = ["name"]
 85
 86    @extend_schema(responses={200: SyncStatusSerializer()})
 87    @action(
 88        methods=["GET"],
 89        detail=True,
 90        pagination_class=None,
 91        url_path="sync/status",
 92        filter_backends=[ObjectFilter],
 93    )
 94    def sync_status(self, request: Request, slug: str) -> Response:
 95        """Get provider's sync status"""
 96        source: KerberosSource = self.get_object()
 97
 98        status = {}
 99
100        with source.sync_lock as lock_acquired:
101            # If we could not acquire the lock, it means a task is using it, and thus is running
102            status["is_running"] = not lock_acquired
103
104        sync_schedule = None
105        for schedule in source.schedules.all():
106            if schedule.actor_name == kerberos_sync.actor_name:
107                sync_schedule = schedule
108
109        if not sync_schedule:
110            return Response(SyncStatusSerializer(status).data)
111
112        last_task: Task = (
113            sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED))
114            .order_by("-mtime")
115            .first()
116        )
117        last_successful_task: Task = (
118            sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO))
119            .order_by("-mtime")
120            .first()
121        )
122
123        if last_task:
124            status["last_sync_status"] = last_task.aggregated_status
125        if last_successful_task:
126            status["last_successful_sync"] = last_successful_task.mtime
127
128        return Response(SyncStatusSerializer(status).data)

Kerberos Source Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'KerberosSourceSerializer'>
lookup_field = 'slug'
filterset_fields = ['pbm_uuid', 'name', 'slug', 'enabled', 'realm', 'kadmin_type', 'sync_users', 'sync_users_password', 'sync_principal', 'spnego_server_name', 'password_login_update_internal_password']
search_fields = ['name', 'slug', 'realm', 'krb5_conf', 'sync_principal', 'spnego_server_name']
ordering = ['name']
@extend_schema(responses={200: SyncStatusSerializer()})
@action(methods=['GET'], detail=True, pagination_class=None, url_path='sync/status', filter_backends=[ObjectFilter])
def sync_status( self, request: rest_framework.request.Request, slug: str) -> rest_framework.response.Response:
 86    @extend_schema(responses={200: SyncStatusSerializer()})
 87    @action(
 88        methods=["GET"],
 89        detail=True,
 90        pagination_class=None,
 91        url_path="sync/status",
 92        filter_backends=[ObjectFilter],
 93    )
 94    def sync_status(self, request: Request, slug: str) -> Response:
 95        """Get provider's sync status"""
 96        source: KerberosSource = self.get_object()
 97
 98        status = {}
 99
100        with source.sync_lock as lock_acquired:
101            # If we could not acquire the lock, it means a task is using it, and thus is running
102            status["is_running"] = not lock_acquired
103
104        sync_schedule = None
105        for schedule in source.schedules.all():
106            if schedule.actor_name == kerberos_sync.actor_name:
107                sync_schedule = schedule
108
109        if not sync_schedule:
110            return Response(SyncStatusSerializer(status).data)
111
112        last_task: Task = (
113            sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED))
114            .order_by("-mtime")
115            .first()
116        )
117        last_successful_task: Task = (
118            sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO))
119            .order_by("-mtime")
120            .first()
121        )
122
123        if last_task:
124            status["last_sync_status"] = last_task.aggregated_status
125        if last_successful_task:
126            status["last_successful_sync"] = last_successful_task.mtime
127
128        return Response(SyncStatusSerializer(status).data)

Get provider's sync status

name = None
description = None
suffix = None
detail = None
basename = None