authentik.providers.ldap.api

LDAPProvider API Views

  1"""LDAPProvider API Views"""
  2
  3from django.db.models import QuerySet
  4from django.db.models.query import Q
  5from django.shortcuts import get_object_or_404
  6from django_filters.filters import BooleanFilter
  7from django_filters.filterset import FilterSet
  8from drf_spectacular.types import OpenApiTypes
  9from drf_spectacular.utils import OpenApiParameter, extend_schema
 10from rest_framework.decorators import action
 11from rest_framework.fields import BooleanField, CharField, ListField, SerializerMethodField
 12from rest_framework.mixins import ListModelMixin
 13from rest_framework.request import Request
 14from rest_framework.response import Response
 15from rest_framework.viewsets import GenericViewSet, ModelViewSet
 16
 17from authentik.core.api.providers import ProviderSerializer
 18from authentik.core.api.used_by import UsedByMixin
 19from authentik.core.api.utils import ModelSerializer, PassiveSerializer
 20from authentik.core.apps import AppAccessWithoutBindings
 21from authentik.core.models import Application
 22from authentik.policies.api.exec import PolicyTestResultSerializer
 23from authentik.policies.engine import PolicyEngine
 24from authentik.policies.types import PolicyResult
 25from authentik.providers.ldap.models import LDAPProvider
 26
 27
 28class LDAPProviderSerializer(ProviderSerializer):
 29    """LDAPProvider Serializer"""
 30
 31    outpost_set = ListField(child=CharField(), read_only=True, source="outpost_set.all")
 32
 33    class Meta:
 34        model = LDAPProvider
 35        fields = ProviderSerializer.Meta.fields + [
 36            "base_dn",
 37            "certificate",
 38            "tls_server_name",
 39            "uid_start_number",
 40            "gid_start_number",
 41            "outpost_set",
 42            "search_mode",
 43            "bind_mode",
 44            "mfa_support",
 45        ]
 46        extra_kwargs = ProviderSerializer.Meta.extra_kwargs
 47
 48
 49class LDAPProviderFilter(FilterSet):
 50    """LDAP Provider filters"""
 51
 52    application__isnull = BooleanFilter(method="filter_application__isnull")
 53
 54    def filter_application__isnull(self, queryset: QuerySet, name, value):
 55        """Only return providers that are neither assigned to application,
 56        both as provider or application provider"""
 57        return queryset.filter(
 58            Q(backchannel_application__isnull=value) | Q(application__isnull=value)
 59        )
 60
 61    class Meta:
 62        model = LDAPProvider
 63        fields = {
 64            "application": ["isnull"],
 65            "name": ["iexact"],
 66            "authorization_flow__slug": ["iexact"],
 67            "base_dn": ["iexact"],
 68            "certificate__kp_uuid": ["iexact"],
 69            "certificate__name": ["iexact"],
 70            "tls_server_name": ["iexact"],
 71            "uid_start_number": ["iexact"],
 72            "gid_start_number": ["iexact"],
 73        }
 74
 75
 76class LDAPProviderViewSet(UsedByMixin, ModelViewSet):
 77    """LDAPProvider Viewset"""
 78
 79    queryset = LDAPProvider.objects.all()
 80    serializer_class = LDAPProviderSerializer
 81    filterset_class = LDAPProviderFilter
 82    search_fields = ["name"]
 83    ordering = ["name"]
 84
 85
 86class LDAPOutpostConfigSerializer(ModelSerializer):
 87    """LDAPProvider Serializer"""
 88
 89    application_slug = SerializerMethodField()
 90    bind_flow_slug = CharField(source="authorization_flow.slug")
 91    unbind_flow_slug = SerializerMethodField()
 92
 93    def get_application_slug(self, instance: LDAPProvider) -> str:
 94        """Prioritise backchannel slug over direct application slug"""
 95        if instance.backchannel_application:
 96            return instance.backchannel_application.slug
 97        return instance.application.slug
 98
 99    def get_unbind_flow_slug(self, instance: LDAPProvider) -> str | None:
100        """Get slug for unbind flow, defaulting to brand's default flow."""
101        flow = instance.invalidation_flow
102        if not flow and "request" in self.context:
103            request = self.context.get("request")
104            flow = request.brand.flow_invalidation
105        if not flow:
106            return None
107        return flow.slug
108
109    class Meta:
110        model = LDAPProvider
111        fields = [
112            "pk",
113            "name",
114            "base_dn",
115            "bind_flow_slug",
116            "unbind_flow_slug",
117            "application_slug",
118            "certificate",
119            "tls_server_name",
120            "uid_start_number",
121            "gid_start_number",
122            "search_mode",
123            "bind_mode",
124            "mfa_support",
125        ]
126
127
128class LDAPOutpostConfigViewSet(ListModelMixin, GenericViewSet):
129    """LDAPProvider Viewset"""
130
131    queryset = LDAPProvider.objects.filter(
132        Q(application__isnull=False) | Q(backchannel_application__isnull=False)
133    )
134    serializer_class = LDAPOutpostConfigSerializer
135    ordering = ["name"]
136    search_fields = ["name"]
137    filterset_fields = ["name"]
138
139    class LDAPCheckAccessSerializer(PassiveSerializer):
140        has_search_permission = BooleanField(required=False)
141        access = PolicyTestResultSerializer()
142
143    @extend_schema(
144        request=None,
145        parameters=[OpenApiParameter("app_slug", OpenApiTypes.STR)],
146        responses={
147            200: LDAPCheckAccessSerializer(),
148        },
149        operation_id="outposts_ldap_access_check",
150    )
151    @action(detail=True)
152    def check_access(self, request: Request, pk) -> Response:
153        """Check access to a single application by slug"""
154        provider = get_object_or_404(LDAPProvider, pk=pk)
155        application = get_object_or_404(Application, slug=request.query_params["app_slug"])
156        engine = PolicyEngine(application, request.user, request)
157        engine.empty_result = AppAccessWithoutBindings.get()
158        engine.use_cache = False
159        engine.build()
160        result = engine.result
161        access_response = PolicyResult(result.passing)
162        response = self.LDAPCheckAccessSerializer(
163            instance={
164                "has_search_permission": (
165                    request.user.has_perm("search_full_directory", provider)
166                    or request.user.has_perm("authentik_providers_ldap.search_full_directory")
167                ),
168                "access": access_response,
169            }
170        )
171        return Response(response.data)
class LDAPProviderSerializer(authentik.core.api.providers.ProviderSerializer):
29class LDAPProviderSerializer(ProviderSerializer):
30    """LDAPProvider Serializer"""
31
32    outpost_set = ListField(child=CharField(), read_only=True, source="outpost_set.all")
33
34    class Meta:
35        model = LDAPProvider
36        fields = ProviderSerializer.Meta.fields + [
37            "base_dn",
38            "certificate",
39            "tls_server_name",
40            "uid_start_number",
41            "gid_start_number",
42            "outpost_set",
43            "search_mode",
44            "bind_mode",
45            "mfa_support",
46        ]
47        extra_kwargs = ProviderSerializer.Meta.extra_kwargs

LDAPProvider Serializer

outpost_set
class LDAPProviderSerializer.Meta:
34    class Meta:
35        model = LDAPProvider
36        fields = ProviderSerializer.Meta.fields + [
37            "base_dn",
38            "certificate",
39            "tls_server_name",
40            "uid_start_number",
41            "gid_start_number",
42            "outpost_set",
43            "search_mode",
44            "bind_mode",
45            "mfa_support",
46        ]
47        extra_kwargs = ProviderSerializer.Meta.extra_kwargs
fields = ['pk', 'name', 'authentication_flow', 'authorization_flow', 'invalidation_flow', 'property_mappings', 'component', 'assigned_application_slug', 'assigned_application_name', 'assigned_backchannel_application_slug', 'assigned_backchannel_application_name', 'verbose_name', 'verbose_name_plural', 'meta_model_name', 'base_dn', 'certificate', 'tls_server_name', 'uid_start_number', 'gid_start_number', 'outpost_set', 'search_mode', 'bind_mode', 'mfa_support']
extra_kwargs = {'authorization_flow': {'required': True, 'allow_null': False}, 'invalidation_flow': {'required': True, 'allow_null': False}}
class LDAPProviderFilter(django_filters.filterset.FilterSet):
50class LDAPProviderFilter(FilterSet):
51    """LDAP Provider filters"""
52
53    application__isnull = BooleanFilter(method="filter_application__isnull")
54
55    def filter_application__isnull(self, queryset: QuerySet, name, value):
56        """Only return providers that are neither assigned to application,
57        both as provider or application provider"""
58        return queryset.filter(
59            Q(backchannel_application__isnull=value) | Q(application__isnull=value)
60        )
61
62    class Meta:
63        model = LDAPProvider
64        fields = {
65            "application": ["isnull"],
66            "name": ["iexact"],
67            "authorization_flow__slug": ["iexact"],
68            "base_dn": ["iexact"],
69            "certificate__kp_uuid": ["iexact"],
70            "certificate__name": ["iexact"],
71            "tls_server_name": ["iexact"],
72            "uid_start_number": ["iexact"],
73            "gid_start_number": ["iexact"],
74        }

LDAP Provider filters

application__isnull
def filter_application__isnull(self, queryset: django.db.models.query.QuerySet, name, value):
55    def filter_application__isnull(self, queryset: QuerySet, name, value):
56        """Only return providers that are neither assigned to application,
57        both as provider or application provider"""
58        return queryset.filter(
59            Q(backchannel_application__isnull=value) | Q(application__isnull=value)
60        )

Only return providers that are neither assigned to application, both as provider or application provider

declared_filters = OrderedDict({'application__isnull': <django_filters.filters.BooleanFilter object>})
base_filters = OrderedDict({'application__isnull': <django_filters.filters.BooleanFilter object>, 'name__iexact': <django_filters.filters.CharFilter object>, 'authorization_flow__slug__iexact': <django_filters.filters.CharFilter object>, 'base_dn__iexact': <django_filters.filters.CharFilter object>, 'certificate__kp_uuid__iexact': <django_filters.filters.UUIDFilter object>, 'certificate__name__iexact': <django_filters.filters.CharFilter object>, 'tls_server_name__iexact': <django_filters.filters.CharFilter object>, 'uid_start_number__iexact': <django_filters.filters.NumberFilter object>, 'gid_start_number__iexact': <django_filters.filters.NumberFilter object>})
class LDAPProviderFilter.Meta:
62    class Meta:
63        model = LDAPProvider
64        fields = {
65            "application": ["isnull"],
66            "name": ["iexact"],
67            "authorization_flow__slug": ["iexact"],
68            "base_dn": ["iexact"],
69            "certificate__kp_uuid": ["iexact"],
70            "certificate__name": ["iexact"],
71            "tls_server_name": ["iexact"],
72            "uid_start_number": ["iexact"],
73            "gid_start_number": ["iexact"],
74        }
fields = {'application': ['isnull'], 'name': ['iexact'], 'authorization_flow__slug': ['iexact'], 'base_dn': ['iexact'], 'certificate__kp_uuid': ['iexact'], 'certificate__name': ['iexact'], 'tls_server_name': ['iexact'], 'uid_start_number': ['iexact'], 'gid_start_number': ['iexact']}
class LDAPProviderViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
77class LDAPProviderViewSet(UsedByMixin, ModelViewSet):
78    """LDAPProvider Viewset"""
79
80    queryset = LDAPProvider.objects.all()
81    serializer_class = LDAPProviderSerializer
82    filterset_class = LDAPProviderFilter
83    search_fields = ["name"]
84    ordering = ["name"]

LDAPProvider Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'LDAPProviderSerializer'>
filterset_class = <class 'LDAPProviderFilter'>
search_fields = ['name']
ordering = ['name']
name = None
description = None
suffix = None
detail = None
basename = None
class LDAPOutpostConfigSerializer(authentik.core.api.utils.ModelSerializer):
 87class LDAPOutpostConfigSerializer(ModelSerializer):
 88    """LDAPProvider Serializer"""
 89
 90    application_slug = SerializerMethodField()
 91    bind_flow_slug = CharField(source="authorization_flow.slug")
 92    unbind_flow_slug = SerializerMethodField()
 93
 94    def get_application_slug(self, instance: LDAPProvider) -> str:
 95        """Prioritise backchannel slug over direct application slug"""
 96        if instance.backchannel_application:
 97            return instance.backchannel_application.slug
 98        return instance.application.slug
 99
100    def get_unbind_flow_slug(self, instance: LDAPProvider) -> str | None:
101        """Get slug for unbind flow, defaulting to brand's default flow."""
102        flow = instance.invalidation_flow
103        if not flow and "request" in self.context:
104            request = self.context.get("request")
105            flow = request.brand.flow_invalidation
106        if not flow:
107            return None
108        return flow.slug
109
110    class Meta:
111        model = LDAPProvider
112        fields = [
113            "pk",
114            "name",
115            "base_dn",
116            "bind_flow_slug",
117            "unbind_flow_slug",
118            "application_slug",
119            "certificate",
120            "tls_server_name",
121            "uid_start_number",
122            "gid_start_number",
123            "search_mode",
124            "bind_mode",
125            "mfa_support",
126        ]

LDAPProvider Serializer

application_slug
bind_flow_slug
unbind_flow_slug
def get_application_slug(self, instance: authentik.providers.ldap.models.LDAPProvider) -> str:
94    def get_application_slug(self, instance: LDAPProvider) -> str:
95        """Prioritise backchannel slug over direct application slug"""
96        if instance.backchannel_application:
97            return instance.backchannel_application.slug
98        return instance.application.slug

Prioritise backchannel slug over direct application slug

def get_unbind_flow_slug( self, instance: authentik.providers.ldap.models.LDAPProvider) -> str | None:
100    def get_unbind_flow_slug(self, instance: LDAPProvider) -> str | None:
101        """Get slug for unbind flow, defaulting to brand's default flow."""
102        flow = instance.invalidation_flow
103        if not flow and "request" in self.context:
104            request = self.context.get("request")
105            flow = request.brand.flow_invalidation
106        if not flow:
107            return None
108        return flow.slug

Get slug for unbind flow, defaulting to brand's default flow.

class LDAPOutpostConfigSerializer.Meta:
110    class Meta:
111        model = LDAPProvider
112        fields = [
113            "pk",
114            "name",
115            "base_dn",
116            "bind_flow_slug",
117            "unbind_flow_slug",
118            "application_slug",
119            "certificate",
120            "tls_server_name",
121            "uid_start_number",
122            "gid_start_number",
123            "search_mode",
124            "bind_mode",
125            "mfa_support",
126        ]
fields = ['pk', 'name', 'base_dn', 'bind_flow_slug', 'unbind_flow_slug', 'application_slug', 'certificate', 'tls_server_name', 'uid_start_number', 'gid_start_number', 'search_mode', 'bind_mode', 'mfa_support']
class LDAPOutpostConfigViewSet(rest_framework.mixins.ListModelMixin, rest_framework.viewsets.GenericViewSet):
129class LDAPOutpostConfigViewSet(ListModelMixin, GenericViewSet):
130    """LDAPProvider Viewset"""
131
132    queryset = LDAPProvider.objects.filter(
133        Q(application__isnull=False) | Q(backchannel_application__isnull=False)
134    )
135    serializer_class = LDAPOutpostConfigSerializer
136    ordering = ["name"]
137    search_fields = ["name"]
138    filterset_fields = ["name"]
139
140    class LDAPCheckAccessSerializer(PassiveSerializer):
141        has_search_permission = BooleanField(required=False)
142        access = PolicyTestResultSerializer()
143
144    @extend_schema(
145        request=None,
146        parameters=[OpenApiParameter("app_slug", OpenApiTypes.STR)],
147        responses={
148            200: LDAPCheckAccessSerializer(),
149        },
150        operation_id="outposts_ldap_access_check",
151    )
152    @action(detail=True)
153    def check_access(self, request: Request, pk) -> Response:
154        """Check access to a single application by slug"""
155        provider = get_object_or_404(LDAPProvider, pk=pk)
156        application = get_object_or_404(Application, slug=request.query_params["app_slug"])
157        engine = PolicyEngine(application, request.user, request)
158        engine.empty_result = AppAccessWithoutBindings.get()
159        engine.use_cache = False
160        engine.build()
161        result = engine.result
162        access_response = PolicyResult(result.passing)
163        response = self.LDAPCheckAccessSerializer(
164            instance={
165                "has_search_permission": (
166                    request.user.has_perm("search_full_directory", provider)
167                    or request.user.has_perm("authentik_providers_ldap.search_full_directory")
168                ),
169                "access": access_response,
170            }
171        )
172        return Response(response.data)

LDAPProvider Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'LDAPOutpostConfigSerializer'>
ordering = ['name']
search_fields = ['name']
filterset_fields = ['name']
@extend_schema(request=None, parameters=[OpenApiParameter('app_slug', OpenApiTypes.STR)], responses={200: LDAPCheckAccessSerializer()}, operation_id='outposts_ldap_access_check')
@action(detail=True)
def check_access( self, request: rest_framework.request.Request, pk) -> rest_framework.response.Response:
144    @extend_schema(
145        request=None,
146        parameters=[OpenApiParameter("app_slug", OpenApiTypes.STR)],
147        responses={
148            200: LDAPCheckAccessSerializer(),
149        },
150        operation_id="outposts_ldap_access_check",
151    )
152    @action(detail=True)
153    def check_access(self, request: Request, pk) -> Response:
154        """Check access to a single application by slug"""
155        provider = get_object_or_404(LDAPProvider, pk=pk)
156        application = get_object_or_404(Application, slug=request.query_params["app_slug"])
157        engine = PolicyEngine(application, request.user, request)
158        engine.empty_result = AppAccessWithoutBindings.get()
159        engine.use_cache = False
160        engine.build()
161        result = engine.result
162        access_response = PolicyResult(result.passing)
163        response = self.LDAPCheckAccessSerializer(
164            instance={
165                "has_search_permission": (
166                    request.user.has_perm("search_full_directory", provider)
167                    or request.user.has_perm("authentik_providers_ldap.search_full_directory")
168                ),
169                "access": access_response,
170            }
171        )
172        return Response(response.data)

Check access to a single application by slug

name = None
description = None
suffix = None
detail = None
basename = None
class LDAPOutpostConfigViewSet.LDAPCheckAccessSerializer(authentik.core.api.utils.PassiveSerializer):
140    class LDAPCheckAccessSerializer(PassiveSerializer):
141        has_search_permission = BooleanField(required=False)
142        access = PolicyTestResultSerializer()

Base serializer class which doesn't implement create/update methods

has_search_permission
access