authentik.sources.kerberos.api.source
Source API Views
1"""Source API Views""" 2 3from django.core.cache import cache 4from drf_spectacular.utils import extend_schema 5from rest_framework.decorators import action 6from rest_framework.fields import SerializerMethodField 7from rest_framework.request import Request 8from rest_framework.response import Response 9from rest_framework.viewsets import ModelViewSet 10 11from authentik.core.api.sources import SourceSerializer 12from authentik.core.api.used_by import UsedByMixin 13from authentik.lib.sync.api import SyncStatusSerializer 14from authentik.rbac.filters import ObjectFilter 15from authentik.sources.kerberos.models import KerberosSource 16from authentik.sources.kerberos.tasks import CACHE_KEY_STATUS, kerberos_sync 17from authentik.tasks.models import Task, TaskStatus 18 19 20class KerberosSourceSerializer(SourceSerializer): 21 """Kerberos Source Serializer""" 22 23 connectivity = SerializerMethodField() 24 25 def get_connectivity(self, source: KerberosSource) -> dict[str, str] | None: 26 """Get cached source connectivity""" 27 return cache.get(CACHE_KEY_STATUS + source.slug, None) 28 29 class Meta: 30 model = KerberosSource 31 fields = SourceSerializer.Meta.fields + [ 32 "group_matching_mode", 33 "realm", 34 "krb5_conf", 35 "kadmin_type", 36 "sync_users", 37 "sync_users_password", 38 "sync_principal", 39 "sync_password", 40 "sync_keytab", 41 "sync_ccache", 42 "connectivity", 43 "spnego_server_name", 44 "spnego_keytab", 45 "spnego_ccache", 46 "password_login_update_internal_password", 47 "sync_outgoing_trigger_mode", 48 ] 49 extra_kwargs = { 50 "sync_password": {"write_only": True}, 51 "sync_keytab": {"write_only": True}, 52 "spnego_keytab": {"write_only": True}, 53 } 54 55 56class KerberosSourceViewSet(UsedByMixin, ModelViewSet): 57 """Kerberos Source Viewset""" 58 59 queryset = KerberosSource.objects.all() 60 serializer_class = KerberosSourceSerializer 61 lookup_field = "slug" 62 filterset_fields = [ 63 "pbm_uuid", 64 "name", 65 "slug", 66 "enabled", 67 "realm", 68 "kadmin_type", 69 "sync_users", 70 "sync_users_password", 71 "sync_principal", 72 "spnego_server_name", 73 "password_login_update_internal_password", 74 ] 75 search_fields = [ 76 "name", 77 "slug", 78 "realm", 79 "krb5_conf", 80 "sync_principal", 81 "spnego_server_name", 82 ] 83 ordering = ["name"] 84 85 @extend_schema(responses={200: SyncStatusSerializer()}) 86 @action( 87 methods=["GET"], 88 detail=True, 89 pagination_class=None, 90 url_path="sync/status", 91 filter_backends=[ObjectFilter], 92 ) 93 def sync_status(self, request: Request, slug: str) -> Response: 94 """Get provider's sync status""" 95 source: KerberosSource = self.get_object() 96 97 status = {} 98 99 with source.sync_lock as lock_acquired: 100 # If we could not acquire the lock, it means a task is using it, and thus is running 101 status["is_running"] = not lock_acquired 102 103 sync_schedule = None 104 for schedule in source.schedules.all(): 105 if schedule.actor_name == kerberos_sync.actor_name: 106 sync_schedule = schedule 107 108 if not sync_schedule: 109 return Response(SyncStatusSerializer(status).data) 110 111 last_task: Task = ( 112 sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED)) 113 .order_by("-mtime") 114 .first() 115 ) 116 last_successful_task: Task = ( 117 sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO)) 118 .order_by("-mtime") 119 .first() 120 ) 121 122 if last_task: 123 status["last_sync_status"] = last_task.aggregated_status 124 if last_successful_task: 125 status["last_successful_sync"] = last_successful_task.mtime 126 127 return Response(SyncStatusSerializer(status).data)
21class KerberosSourceSerializer(SourceSerializer): 22 """Kerberos Source Serializer""" 23 24 connectivity = SerializerMethodField() 25 26 def get_connectivity(self, source: KerberosSource) -> dict[str, str] | None: 27 """Get cached source connectivity""" 28 return cache.get(CACHE_KEY_STATUS + source.slug, None) 29 30 class Meta: 31 model = KerberosSource 32 fields = SourceSerializer.Meta.fields + [ 33 "group_matching_mode", 34 "realm", 35 "krb5_conf", 36 "kadmin_type", 37 "sync_users", 38 "sync_users_password", 39 "sync_principal", 40 "sync_password", 41 "sync_keytab", 42 "sync_ccache", 43 "connectivity", 44 "spnego_server_name", 45 "spnego_keytab", 46 "spnego_ccache", 47 "password_login_update_internal_password", 48 "sync_outgoing_trigger_mode", 49 ] 50 extra_kwargs = { 51 "sync_password": {"write_only": True}, 52 "sync_keytab": {"write_only": True}, 53 "spnego_keytab": {"write_only": True}, 54 }
Kerberos Source Serializer
def
get_connectivity( self, source: authentik.sources.kerberos.models.KerberosSource) -> dict[str, str] | None:
26 def get_connectivity(self, source: KerberosSource) -> dict[str, str] | None: 27 """Get cached source connectivity""" 28 return cache.get(CACHE_KEY_STATUS + source.slug, None)
Get cached source connectivity
Inherited Members
class
KerberosSourceSerializer.Meta:
30 class Meta: 31 model = KerberosSource 32 fields = SourceSerializer.Meta.fields + [ 33 "group_matching_mode", 34 "realm", 35 "krb5_conf", 36 "kadmin_type", 37 "sync_users", 38 "sync_users_password", 39 "sync_principal", 40 "sync_password", 41 "sync_keytab", 42 "sync_ccache", 43 "connectivity", 44 "spnego_server_name", 45 "spnego_keytab", 46 "spnego_ccache", 47 "password_login_update_internal_password", 48 "sync_outgoing_trigger_mode", 49 ] 50 extra_kwargs = { 51 "sync_password": {"write_only": True}, 52 "sync_keytab": {"write_only": True}, 53 "spnego_keytab": {"write_only": True}, 54 }
model =
<class 'authentik.sources.kerberos.models.KerberosSource'>
fields =
['pk', 'name', 'slug', 'enabled', 'promoted', 'authentication_flow', 'enrollment_flow', 'user_property_mappings', 'group_property_mappings', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name', 'policy_engine_mode', 'user_matching_mode', 'managed', 'user_path_template', 'icon', 'icon_url', 'icon_themed_urls', 'group_matching_mode', 'realm', 'krb5_conf', 'kadmin_type', 'sync_users', 'sync_users_password', 'sync_principal', 'sync_password', 'sync_keytab', 'sync_ccache', 'connectivity', 'spnego_server_name', 'spnego_keytab', 'spnego_ccache', 'password_login_update_internal_password', 'sync_outgoing_trigger_mode']
class
KerberosSourceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
57class KerberosSourceViewSet(UsedByMixin, ModelViewSet): 58 """Kerberos Source Viewset""" 59 60 queryset = KerberosSource.objects.all() 61 serializer_class = KerberosSourceSerializer 62 lookup_field = "slug" 63 filterset_fields = [ 64 "pbm_uuid", 65 "name", 66 "slug", 67 "enabled", 68 "realm", 69 "kadmin_type", 70 "sync_users", 71 "sync_users_password", 72 "sync_principal", 73 "spnego_server_name", 74 "password_login_update_internal_password", 75 ] 76 search_fields = [ 77 "name", 78 "slug", 79 "realm", 80 "krb5_conf", 81 "sync_principal", 82 "spnego_server_name", 83 ] 84 ordering = ["name"] 85 86 @extend_schema(responses={200: SyncStatusSerializer()}) 87 @action( 88 methods=["GET"], 89 detail=True, 90 pagination_class=None, 91 url_path="sync/status", 92 filter_backends=[ObjectFilter], 93 ) 94 def sync_status(self, request: Request, slug: str) -> Response: 95 """Get provider's sync status""" 96 source: KerberosSource = self.get_object() 97 98 status = {} 99 100 with source.sync_lock as lock_acquired: 101 # If we could not acquire the lock, it means a task is using it, and thus is running 102 status["is_running"] = not lock_acquired 103 104 sync_schedule = None 105 for schedule in source.schedules.all(): 106 if schedule.actor_name == kerberos_sync.actor_name: 107 sync_schedule = schedule 108 109 if not sync_schedule: 110 return Response(SyncStatusSerializer(status).data) 111 112 last_task: Task = ( 113 sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED)) 114 .order_by("-mtime") 115 .first() 116 ) 117 last_successful_task: Task = ( 118 sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO)) 119 .order_by("-mtime") 120 .first() 121 ) 122 123 if last_task: 124 status["last_sync_status"] = last_task.aggregated_status 125 if last_successful_task: 126 status["last_successful_sync"] = last_successful_task.mtime 127 128 return Response(SyncStatusSerializer(status).data)
Kerberos Source Viewset
serializer_class =
<class 'KerberosSourceSerializer'>
filterset_fields =
['pbm_uuid', 'name', 'slug', 'enabled', 'realm', 'kadmin_type', 'sync_users', 'sync_users_password', 'sync_principal', 'spnego_server_name', 'password_login_update_internal_password']
@extend_schema(responses={200: SyncStatusSerializer()})
@action(methods=['GET'], detail=True, pagination_class=None, url_path='sync/status', filter_backends=[ObjectFilter])
def
sync_status( self, request: rest_framework.request.Request, slug: str) -> rest_framework.response.Response:
86 @extend_schema(responses={200: SyncStatusSerializer()}) 87 @action( 88 methods=["GET"], 89 detail=True, 90 pagination_class=None, 91 url_path="sync/status", 92 filter_backends=[ObjectFilter], 93 ) 94 def sync_status(self, request: Request, slug: str) -> Response: 95 """Get provider's sync status""" 96 source: KerberosSource = self.get_object() 97 98 status = {} 99 100 with source.sync_lock as lock_acquired: 101 # If we could not acquire the lock, it means a task is using it, and thus is running 102 status["is_running"] = not lock_acquired 103 104 sync_schedule = None 105 for schedule in source.schedules.all(): 106 if schedule.actor_name == kerberos_sync.actor_name: 107 sync_schedule = schedule 108 109 if not sync_schedule: 110 return Response(SyncStatusSerializer(status).data) 111 112 last_task: Task = ( 113 sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED)) 114 .order_by("-mtime") 115 .first() 116 ) 117 last_successful_task: Task = ( 118 sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO)) 119 .order_by("-mtime") 120 .first() 121 ) 122 123 if last_task: 124 status["last_sync_status"] = last_task.aggregated_status 125 if last_successful_task: 126 status["last_successful_sync"] = last_successful_task.mtime 127 128 return Response(SyncStatusSerializer(status).data)
Get provider's sync status