authentik.sources.ldap.api

Source API Views

  1"""Source API Views"""
  2
  3from typing import Any
  4
  5from django.core.cache import cache
  6from django.utils.translation import gettext_lazy as _
  7from drf_spectacular.utils import extend_schema, inline_serializer
  8from rest_framework.decorators import action
  9from rest_framework.exceptions import ValidationError
 10from rest_framework.fields import DictField, ListField, SerializerMethodField
 11from rest_framework.relations import PrimaryKeyRelatedField
 12from rest_framework.request import Request
 13from rest_framework.response import Response
 14from rest_framework.viewsets import ModelViewSet
 15
 16from authentik.core.api.property_mappings import PropertyMappingFilterSet, PropertyMappingSerializer
 17from authentik.core.api.sources import (
 18    GroupSourceConnectionSerializer,
 19    GroupSourceConnectionViewSet,
 20    SourceSerializer,
 21    UserSourceConnectionSerializer,
 22    UserSourceConnectionViewSet,
 23)
 24from authentik.core.api.used_by import UsedByMixin
 25from authentik.crypto.models import CertificateKeyPair
 26from authentik.lib.sync.api import SyncStatusSerializer
 27from authentik.rbac.filters import ObjectFilter
 28from authentik.sources.ldap.models import (
 29    GroupLDAPSourceConnection,
 30    LDAPSource,
 31    LDAPSourcePropertyMapping,
 32    UserLDAPSourceConnection,
 33)
 34from authentik.sources.ldap.tasks import CACHE_KEY_STATUS, SYNC_CLASSES, ldap_sync
 35from authentik.tasks.models import Task, TaskStatus
 36
 37
 38class LDAPSourceSerializer(SourceSerializer):
 39    """LDAP Source Serializer"""
 40
 41    connectivity = SerializerMethodField()
 42    client_certificate = PrimaryKeyRelatedField(
 43        allow_null=True,
 44        help_text="Client certificate to authenticate against the LDAP Server's Certificate.",
 45        queryset=CertificateKeyPair.objects.exclude(
 46            key_data__exact="",
 47        ),
 48        required=False,
 49    )
 50
 51    def get_connectivity(self, source: LDAPSource) -> dict[str, dict[str, str]] | None:
 52        """Get cached source connectivity"""
 53        return cache.get(CACHE_KEY_STATUS + source.slug, None)
 54
 55    def validate_sync_users_password(self, sync_users_password: bool) -> bool:
 56        """Check that only a single source has password_sync on"""
 57        if sync_users_password:
 58            sources = LDAPSource.objects.filter(sync_users_password=True)
 59            if self.instance:
 60                sources = sources.exclude(pk=self.instance.pk)
 61            if sources.exists():
 62                raise ValidationError(
 63                    _("Only a single LDAP Source with password synchronization is allowed")
 64                )
 65        return sync_users_password
 66
 67    def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
 68        """Validate property mappings with sync_ flags"""
 69        types = ["user", "group"]
 70        for type in types:
 71            toggle_value = attrs.get(f"sync_{type}s", False)
 72            mappings_field = f"{type}_property_mappings"
 73            mappings_value = attrs.get(mappings_field, [])
 74            if toggle_value and len(mappings_value) == 0:
 75                raise ValidationError(
 76                    {
 77                        mappings_field: _(
 78                            (
 79                                "When 'Sync {type}s' is enabled, '{type}s property "
 80                                "mappings' cannot be empty."
 81                            ).format(type=type)
 82                        )
 83                    }
 84                )
 85        return super().validate(attrs)
 86
 87    class Meta:
 88        model = LDAPSource
 89        fields = SourceSerializer.Meta.fields + [
 90            "server_uri",
 91            "peer_certificate",
 92            "client_certificate",
 93            "bind_cn",
 94            "bind_password",
 95            "start_tls",
 96            "sni",
 97            "base_dn",
 98            "additional_user_dn",
 99            "additional_group_dn",
100            "user_object_filter",
101            "group_object_filter",
102            "group_membership_field",
103            "user_membership_attribute",
104            "object_uniqueness_field",
105            "password_login_update_internal_password",
106            "sync_users",
107            "sync_users_password",
108            "sync_groups",
109            "sync_parent_group",
110            "connectivity",
111            "lookup_groups_from_user",
112            "delete_not_found_objects",
113            "sync_outgoing_trigger_mode",
114        ]
115        extra_kwargs = {"bind_password": {"write_only": True}}
116
117
118class LDAPSourceViewSet(UsedByMixin, ModelViewSet):
119    """LDAP Source Viewset"""
120
121    queryset = LDAPSource.objects.all()
122    serializer_class = LDAPSourceSerializer
123    lookup_field = "slug"
124    filterset_fields = [
125        "pbm_uuid",
126        "name",
127        "slug",
128        "enabled",
129        "server_uri",
130        "bind_cn",
131        "peer_certificate",
132        "client_certificate",
133        "start_tls",
134        "sni",
135        "base_dn",
136        "additional_user_dn",
137        "additional_group_dn",
138        "user_object_filter",
139        "group_object_filter",
140        "group_membership_field",
141        "user_membership_attribute",
142        "object_uniqueness_field",
143        "password_login_update_internal_password",
144        "sync_users",
145        "sync_users_password",
146        "sync_groups",
147        "sync_parent_group",
148        "user_property_mappings",
149        "group_property_mappings",
150        "lookup_groups_from_user",
151        "delete_not_found_objects",
152    ]
153    search_fields = ["name", "slug"]
154    ordering = ["name"]
155
156    @extend_schema(responses={200: SyncStatusSerializer()})
157    @action(
158        methods=["GET"],
159        detail=True,
160        pagination_class=None,
161        url_path="sync/status",
162        filter_backends=[ObjectFilter],
163    )
164    def sync_status(self, request: Request, slug: str) -> Response:
165        """Get provider's sync status"""
166        source: LDAPSource = self.get_object()
167
168        status = {}
169
170        with source.sync_lock as lock_acquired:
171            # If we could not acquire the lock, it means a task is using it, and thus is running
172            status["is_running"] = not lock_acquired
173
174        sync_schedule = None
175        for schedule in source.schedules.all():
176            if schedule.actor_name == ldap_sync.actor_name:
177                sync_schedule = schedule
178
179        if not sync_schedule:
180            return Response(SyncStatusSerializer(status).data)
181
182        last_task: Task = (
183            sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED))
184            .order_by("-mtime")
185            .first()
186        )
187        last_successful_task: Task = (
188            sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO))
189            .order_by("-mtime")
190            .first()
191        )
192
193        if last_task:
194            status["last_sync_status"] = last_task.aggregated_status
195        if last_successful_task:
196            status["last_successful_sync"] = last_successful_task.mtime
197
198        return Response(SyncStatusSerializer(status).data)
199
200    @extend_schema(
201        responses={
202            200: inline_serializer(
203                "LDAPDebugSerializer",
204                fields={
205                    "user": ListField(child=DictField(), read_only=True),
206                    "group": ListField(child=DictField(), read_only=True),
207                    "membership": ListField(child=DictField(), read_only=True),
208                },
209            ),
210        }
211    )
212    @action(methods=["GET"], detail=True, pagination_class=None, filter_backends=[])
213    def debug(self, request: Request, slug: str) -> Response:
214        """Get raw LDAP data to debug"""
215        source = self.get_object()
216        all_objects = {}
217        for sync_class in SYNC_CLASSES:
218            class_name = sync_class.name()
219            all_objects.setdefault(class_name, [])
220            for page in sync_class(source, Task()).get_objects(size_limit=10):
221                for obj in page:
222                    obj: dict
223                    obj.pop("raw_attributes", None)
224                    obj.pop("raw_dn", None)
225                    all_objects[class_name].append(obj)
226        return Response(data=all_objects)
227
228
229class LDAPSourcePropertyMappingSerializer(PropertyMappingSerializer):
230    """LDAP PropertyMapping Serializer"""
231
232    class Meta:
233        model = LDAPSourcePropertyMapping
234        fields = PropertyMappingSerializer.Meta.fields
235
236
237class LDAPSourcePropertyMappingFilter(PropertyMappingFilterSet):
238    """Filter for LDAPSourcePropertyMapping"""
239
240    class Meta(PropertyMappingFilterSet.Meta):
241        model = LDAPSourcePropertyMapping
242
243
244class LDAPSourcePropertyMappingViewSet(UsedByMixin, ModelViewSet):
245    """LDAP PropertyMapping Viewset"""
246
247    queryset = LDAPSourcePropertyMapping.objects.all()
248    serializer_class = LDAPSourcePropertyMappingSerializer
249    filterset_class = LDAPSourcePropertyMappingFilter
250    search_fields = ["name"]
251    ordering = ["name"]
252
253
254class UserLDAPSourceConnectionSerializer(UserSourceConnectionSerializer):
255    class Meta(UserSourceConnectionSerializer.Meta):
256        model = UserLDAPSourceConnection
257
258
259class UserLDAPSourceConnectionViewSet(UserSourceConnectionViewSet, ModelViewSet):
260    queryset = UserLDAPSourceConnection.objects.all()
261    serializer_class = UserLDAPSourceConnectionSerializer
262
263
264class GroupLDAPSourceConnectionSerializer(GroupSourceConnectionSerializer):
265    class Meta(GroupSourceConnectionSerializer.Meta):
266        model = GroupLDAPSourceConnection
267
268
269class GroupLDAPSourceConnectionViewSet(GroupSourceConnectionViewSet, ModelViewSet):
270    queryset = GroupLDAPSourceConnection.objects.all()
271    serializer_class = GroupLDAPSourceConnectionSerializer
class LDAPSourceSerializer(authentik.core.api.sources.SourceSerializer):
 39class LDAPSourceSerializer(SourceSerializer):
 40    """LDAP Source Serializer"""
 41
 42    connectivity = SerializerMethodField()
 43    client_certificate = PrimaryKeyRelatedField(
 44        allow_null=True,
 45        help_text="Client certificate to authenticate against the LDAP Server's Certificate.",
 46        queryset=CertificateKeyPair.objects.exclude(
 47            key_data__exact="",
 48        ),
 49        required=False,
 50    )
 51
 52    def get_connectivity(self, source: LDAPSource) -> dict[str, dict[str, str]] | None:
 53        """Get cached source connectivity"""
 54        return cache.get(CACHE_KEY_STATUS + source.slug, None)
 55
 56    def validate_sync_users_password(self, sync_users_password: bool) -> bool:
 57        """Check that only a single source has password_sync on"""
 58        if sync_users_password:
 59            sources = LDAPSource.objects.filter(sync_users_password=True)
 60            if self.instance:
 61                sources = sources.exclude(pk=self.instance.pk)
 62            if sources.exists():
 63                raise ValidationError(
 64                    _("Only a single LDAP Source with password synchronization is allowed")
 65                )
 66        return sync_users_password
 67
 68    def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
 69        """Validate property mappings with sync_ flags"""
 70        types = ["user", "group"]
 71        for type in types:
 72            toggle_value = attrs.get(f"sync_{type}s", False)
 73            mappings_field = f"{type}_property_mappings"
 74            mappings_value = attrs.get(mappings_field, [])
 75            if toggle_value and len(mappings_value) == 0:
 76                raise ValidationError(
 77                    {
 78                        mappings_field: _(
 79                            (
 80                                "When 'Sync {type}s' is enabled, '{type}s property "
 81                                "mappings' cannot be empty."
 82                            ).format(type=type)
 83                        )
 84                    }
 85                )
 86        return super().validate(attrs)
 87
 88    class Meta:
 89        model = LDAPSource
 90        fields = SourceSerializer.Meta.fields + [
 91            "server_uri",
 92            "peer_certificate",
 93            "client_certificate",
 94            "bind_cn",
 95            "bind_password",
 96            "start_tls",
 97            "sni",
 98            "base_dn",
 99            "additional_user_dn",
100            "additional_group_dn",
101            "user_object_filter",
102            "group_object_filter",
103            "group_membership_field",
104            "user_membership_attribute",
105            "object_uniqueness_field",
106            "password_login_update_internal_password",
107            "sync_users",
108            "sync_users_password",
109            "sync_groups",
110            "sync_parent_group",
111            "connectivity",
112            "lookup_groups_from_user",
113            "delete_not_found_objects",
114            "sync_outgoing_trigger_mode",
115        ]
116        extra_kwargs = {"bind_password": {"write_only": True}}

LDAP Source Serializer

connectivity
client_certificate
def get_connectivity( self, source: authentik.sources.ldap.models.LDAPSource) -> dict[str, dict[str, str]] | None:
52    def get_connectivity(self, source: LDAPSource) -> dict[str, dict[str, str]] | None:
53        """Get cached source connectivity"""
54        return cache.get(CACHE_KEY_STATUS + source.slug, None)

Get cached source connectivity

def validate_sync_users_password(self, sync_users_password: bool) -> bool:
56    def validate_sync_users_password(self, sync_users_password: bool) -> bool:
57        """Check that only a single source has password_sync on"""
58        if sync_users_password:
59            sources = LDAPSource.objects.filter(sync_users_password=True)
60            if self.instance:
61                sources = sources.exclude(pk=self.instance.pk)
62            if sources.exists():
63                raise ValidationError(
64                    _("Only a single LDAP Source with password synchronization is allowed")
65                )
66        return sync_users_password

Check that only a single source has password_sync on

def validate(self, attrs: dict[str, typing.Any]) -> dict[str, typing.Any]:
68    def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
69        """Validate property mappings with sync_ flags"""
70        types = ["user", "group"]
71        for type in types:
72            toggle_value = attrs.get(f"sync_{type}s", False)
73            mappings_field = f"{type}_property_mappings"
74            mappings_value = attrs.get(mappings_field, [])
75            if toggle_value and len(mappings_value) == 0:
76                raise ValidationError(
77                    {
78                        mappings_field: _(
79                            (
80                                "When 'Sync {type}s' is enabled, '{type}s property "
81                                "mappings' cannot be empty."
82                            ).format(type=type)
83                        )
84                    }
85                )
86        return super().validate(attrs)

Validate property mappings with sync_ flags

class LDAPSourceSerializer.Meta:
 88    class Meta:
 89        model = LDAPSource
 90        fields = SourceSerializer.Meta.fields + [
 91            "server_uri",
 92            "peer_certificate",
 93            "client_certificate",
 94            "bind_cn",
 95            "bind_password",
 96            "start_tls",
 97            "sni",
 98            "base_dn",
 99            "additional_user_dn",
100            "additional_group_dn",
101            "user_object_filter",
102            "group_object_filter",
103            "group_membership_field",
104            "user_membership_attribute",
105            "object_uniqueness_field",
106            "password_login_update_internal_password",
107            "sync_users",
108            "sync_users_password",
109            "sync_groups",
110            "sync_parent_group",
111            "connectivity",
112            "lookup_groups_from_user",
113            "delete_not_found_objects",
114            "sync_outgoing_trigger_mode",
115        ]
116        extra_kwargs = {"bind_password": {"write_only": True}}
fields = ['pk', 'name', 'slug', 'enabled', 'promoted', 'authentication_flow', 'enrollment_flow', 'user_property_mappings', 'group_property_mappings', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name', 'policy_engine_mode', 'user_matching_mode', 'managed', 'user_path_template', 'icon', 'icon_url', 'icon_themed_urls', 'server_uri', 'peer_certificate', 'client_certificate', 'bind_cn', 'bind_password', 'start_tls', 'sni', 'base_dn', 'additional_user_dn', 'additional_group_dn', 'user_object_filter', 'group_object_filter', 'group_membership_field', 'user_membership_attribute', 'object_uniqueness_field', 'password_login_update_internal_password', 'sync_users', 'sync_users_password', 'sync_groups', 'sync_parent_group', 'connectivity', 'lookup_groups_from_user', 'delete_not_found_objects', 'sync_outgoing_trigger_mode']
extra_kwargs = {'bind_password': {'write_only': True}}
class LDAPSourceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
119class LDAPSourceViewSet(UsedByMixin, ModelViewSet):
120    """LDAP Source Viewset"""
121
122    queryset = LDAPSource.objects.all()
123    serializer_class = LDAPSourceSerializer
124    lookup_field = "slug"
125    filterset_fields = [
126        "pbm_uuid",
127        "name",
128        "slug",
129        "enabled",
130        "server_uri",
131        "bind_cn",
132        "peer_certificate",
133        "client_certificate",
134        "start_tls",
135        "sni",
136        "base_dn",
137        "additional_user_dn",
138        "additional_group_dn",
139        "user_object_filter",
140        "group_object_filter",
141        "group_membership_field",
142        "user_membership_attribute",
143        "object_uniqueness_field",
144        "password_login_update_internal_password",
145        "sync_users",
146        "sync_users_password",
147        "sync_groups",
148        "sync_parent_group",
149        "user_property_mappings",
150        "group_property_mappings",
151        "lookup_groups_from_user",
152        "delete_not_found_objects",
153    ]
154    search_fields = ["name", "slug"]
155    ordering = ["name"]
156
157    @extend_schema(responses={200: SyncStatusSerializer()})
158    @action(
159        methods=["GET"],
160        detail=True,
161        pagination_class=None,
162        url_path="sync/status",
163        filter_backends=[ObjectFilter],
164    )
165    def sync_status(self, request: Request, slug: str) -> Response:
166        """Get provider's sync status"""
167        source: LDAPSource = self.get_object()
168
169        status = {}
170
171        with source.sync_lock as lock_acquired:
172            # If we could not acquire the lock, it means a task is using it, and thus is running
173            status["is_running"] = not lock_acquired
174
175        sync_schedule = None
176        for schedule in source.schedules.all():
177            if schedule.actor_name == ldap_sync.actor_name:
178                sync_schedule = schedule
179
180        if not sync_schedule:
181            return Response(SyncStatusSerializer(status).data)
182
183        last_task: Task = (
184            sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED))
185            .order_by("-mtime")
186            .first()
187        )
188        last_successful_task: Task = (
189            sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO))
190            .order_by("-mtime")
191            .first()
192        )
193
194        if last_task:
195            status["last_sync_status"] = last_task.aggregated_status
196        if last_successful_task:
197            status["last_successful_sync"] = last_successful_task.mtime
198
199        return Response(SyncStatusSerializer(status).data)
200
201    @extend_schema(
202        responses={
203            200: inline_serializer(
204                "LDAPDebugSerializer",
205                fields={
206                    "user": ListField(child=DictField(), read_only=True),
207                    "group": ListField(child=DictField(), read_only=True),
208                    "membership": ListField(child=DictField(), read_only=True),
209                },
210            ),
211        }
212    )
213    @action(methods=["GET"], detail=True, pagination_class=None, filter_backends=[])
214    def debug(self, request: Request, slug: str) -> Response:
215        """Get raw LDAP data to debug"""
216        source = self.get_object()
217        all_objects = {}
218        for sync_class in SYNC_CLASSES:
219            class_name = sync_class.name()
220            all_objects.setdefault(class_name, [])
221            for page in sync_class(source, Task()).get_objects(size_limit=10):
222                for obj in page:
223                    obj: dict
224                    obj.pop("raw_attributes", None)
225                    obj.pop("raw_dn", None)
226                    all_objects[class_name].append(obj)
227        return Response(data=all_objects)

LDAP Source Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'LDAPSourceSerializer'>
lookup_field = 'slug'
filterset_fields = ['pbm_uuid', 'name', 'slug', 'enabled', 'server_uri', 'bind_cn', 'peer_certificate', 'client_certificate', 'start_tls', 'sni', 'base_dn', 'additional_user_dn', 'additional_group_dn', 'user_object_filter', 'group_object_filter', 'group_membership_field', 'user_membership_attribute', 'object_uniqueness_field', 'password_login_update_internal_password', 'sync_users', 'sync_users_password', 'sync_groups', 'sync_parent_group', 'user_property_mappings', 'group_property_mappings', 'lookup_groups_from_user', 'delete_not_found_objects']
search_fields = ['name', 'slug']
ordering = ['name']
@extend_schema(responses={200: SyncStatusSerializer()})
@action(methods=['GET'], detail=True, pagination_class=None, url_path='sync/status', filter_backends=[ObjectFilter])
def sync_status( self, request: rest_framework.request.Request, slug: str) -> rest_framework.response.Response:
157    @extend_schema(responses={200: SyncStatusSerializer()})
158    @action(
159        methods=["GET"],
160        detail=True,
161        pagination_class=None,
162        url_path="sync/status",
163        filter_backends=[ObjectFilter],
164    )
165    def sync_status(self, request: Request, slug: str) -> Response:
166        """Get provider's sync status"""
167        source: LDAPSource = self.get_object()
168
169        status = {}
170
171        with source.sync_lock as lock_acquired:
172            # If we could not acquire the lock, it means a task is using it, and thus is running
173            status["is_running"] = not lock_acquired
174
175        sync_schedule = None
176        for schedule in source.schedules.all():
177            if schedule.actor_name == ldap_sync.actor_name:
178                sync_schedule = schedule
179
180        if not sync_schedule:
181            return Response(SyncStatusSerializer(status).data)
182
183        last_task: Task = (
184            sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED))
185            .order_by("-mtime")
186            .first()
187        )
188        last_successful_task: Task = (
189            sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO))
190            .order_by("-mtime")
191            .first()
192        )
193
194        if last_task:
195            status["last_sync_status"] = last_task.aggregated_status
196        if last_successful_task:
197            status["last_successful_sync"] = last_successful_task.mtime
198
199        return Response(SyncStatusSerializer(status).data)

Get provider's sync status

@extend_schema(responses={200: inline_serializer('LDAPDebugSerializer', fields={'user': ListField(child=DictField(), read_only=True), 'group': ListField(child=DictField(), read_only=True), 'membership': ListField(child=DictField(), read_only=True)})})
@action(methods=['GET'], detail=True, pagination_class=None, filter_backends=[])
def debug( self, request: rest_framework.request.Request, slug: str) -> rest_framework.response.Response:
201    @extend_schema(
202        responses={
203            200: inline_serializer(
204                "LDAPDebugSerializer",
205                fields={
206                    "user": ListField(child=DictField(), read_only=True),
207                    "group": ListField(child=DictField(), read_only=True),
208                    "membership": ListField(child=DictField(), read_only=True),
209                },
210            ),
211        }
212    )
213    @action(methods=["GET"], detail=True, pagination_class=None, filter_backends=[])
214    def debug(self, request: Request, slug: str) -> Response:
215        """Get raw LDAP data to debug"""
216        source = self.get_object()
217        all_objects = {}
218        for sync_class in SYNC_CLASSES:
219            class_name = sync_class.name()
220            all_objects.setdefault(class_name, [])
221            for page in sync_class(source, Task()).get_objects(size_limit=10):
222                for obj in page:
223                    obj: dict
224                    obj.pop("raw_attributes", None)
225                    obj.pop("raw_dn", None)
226                    all_objects[class_name].append(obj)
227        return Response(data=all_objects)

Get raw LDAP data to debug

name = None
description = None
suffix = None
detail = None
basename = None
class LDAPSourcePropertyMappingSerializer(authentik.core.api.property_mappings.PropertyMappingSerializer):
230class LDAPSourcePropertyMappingSerializer(PropertyMappingSerializer):
231    """LDAP PropertyMapping Serializer"""
232
233    class Meta:
234        model = LDAPSourcePropertyMapping
235        fields = PropertyMappingSerializer.Meta.fields

LDAP PropertyMapping Serializer

class LDAPSourcePropertyMappingSerializer.Meta:
233    class Meta:
234        model = LDAPSourcePropertyMapping
235        fields = PropertyMappingSerializer.Meta.fields
fields = ['pk', 'managed', 'name', 'expression', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name']
class LDAPSourcePropertyMappingFilter(authentik.core.api.property_mappings.PropertyMappingFilterSet):
238class LDAPSourcePropertyMappingFilter(PropertyMappingFilterSet):
239    """Filter for LDAPSourcePropertyMapping"""
240
241    class Meta(PropertyMappingFilterSet.Meta):
242        model = LDAPSourcePropertyMapping

Filter for LDAPSourcePropertyMapping

declared_filters = OrderedDict({'managed': <django_filters.filters.AllValuesMultipleFilter object>, 'managed__isnull': <django_filters.filters.BooleanFilter object>})
base_filters = OrderedDict({'name': <django_filters.filters.CharFilter object>, 'managed': <django_filters.filters.AllValuesMultipleFilter object>, 'managed__isnull': <django_filters.filters.BooleanFilter object>})
class LDAPSourcePropertyMappingFilter.Meta(authentik.core.api.property_mappings.PropertyMappingFilterSet.Meta):
241    class Meta(PropertyMappingFilterSet.Meta):
242        model = LDAPSourcePropertyMapping
class LDAPSourcePropertyMappingViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
245class LDAPSourcePropertyMappingViewSet(UsedByMixin, ModelViewSet):
246    """LDAP PropertyMapping Viewset"""
247
248    queryset = LDAPSourcePropertyMapping.objects.all()
249    serializer_class = LDAPSourcePropertyMappingSerializer
250    filterset_class = LDAPSourcePropertyMappingFilter
251    search_fields = ["name"]
252    ordering = ["name"]

LDAP PropertyMapping Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'LDAPSourcePropertyMappingSerializer'>
filterset_class = <class 'LDAPSourcePropertyMappingFilter'>
search_fields = ['name']
ordering = ['name']
name = None
description = None
suffix = None
detail = None
basename = None
class UserLDAPSourceConnectionSerializer(authentik.core.api.sources.UserSourceConnectionSerializer):
255class UserLDAPSourceConnectionSerializer(UserSourceConnectionSerializer):
256    class Meta(UserSourceConnectionSerializer.Meta):
257        model = UserLDAPSourceConnection

User source connection

class UserLDAPSourceConnectionSerializer.Meta(authentik.core.api.sources.UserSourceConnectionSerializer.Meta):
256    class Meta(UserSourceConnectionSerializer.Meta):
257        model = UserLDAPSourceConnection
class UserLDAPSourceConnectionViewSet(authentik.core.api.sources.UserSourceConnectionViewSet, rest_framework.viewsets.ModelViewSet):
260class UserLDAPSourceConnectionViewSet(UserSourceConnectionViewSet, ModelViewSet):
261    queryset = UserLDAPSourceConnection.objects.all()
262    serializer_class = UserLDAPSourceConnectionSerializer

User-source connection Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'UserLDAPSourceConnectionSerializer'>
name = None
description = None
suffix = None
detail = None
basename = None
class GroupLDAPSourceConnectionSerializer(authentik.core.api.sources.GroupSourceConnectionSerializer):
265class GroupLDAPSourceConnectionSerializer(GroupSourceConnectionSerializer):
266    class Meta(GroupSourceConnectionSerializer.Meta):
267        model = GroupLDAPSourceConnection

Group Source Connection

class GroupLDAPSourceConnectionSerializer.Meta(authentik.core.api.sources.GroupSourceConnectionSerializer.Meta):
266    class Meta(GroupSourceConnectionSerializer.Meta):
267        model = GroupLDAPSourceConnection
class GroupLDAPSourceConnectionViewSet(authentik.core.api.sources.GroupSourceConnectionViewSet, rest_framework.viewsets.ModelViewSet):
270class GroupLDAPSourceConnectionViewSet(GroupSourceConnectionViewSet, ModelViewSet):
271    queryset = GroupLDAPSourceConnection.objects.all()
272    serializer_class = GroupLDAPSourceConnectionSerializer

Group-source connection Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'GroupLDAPSourceConnectionSerializer'>
name = None
description = None
suffix = None
detail = None
basename = None