authentik.sources.ldap.models

authentik LDAP Models

  1"""authentik LDAP Models"""
  2
  3from os import chmod
  4from os.path import dirname, exists
  5from shutil import rmtree
  6from ssl import CERT_REQUIRED
  7from tempfile import NamedTemporaryFile, mkdtemp
  8from typing import Any
  9
 10import pglock
 11from django.db import connection, models
 12from django.templatetags.static import static
 13from django.utils.translation import gettext_lazy as _
 14from ldap3 import ALL, NONE, RANDOM, Connection, Server, ServerPool, Tls
 15from ldap3.core.exceptions import (
 16    LDAPAdminLimitExceededResult,
 17    LDAPAttributeError,
 18    LDAPException,
 19    LDAPInsufficientAccessRightsResult,
 20    LDAPSchemaError,
 21)
 22from rest_framework.serializers import Serializer
 23from structlog.stdlib import get_logger
 24
 25from authentik.core.models import (
 26    Group,
 27    GroupSourceConnection,
 28    PropertyMapping,
 29    UserSourceConnection,
 30)
 31from authentik.crypto.models import CertificateKeyPair
 32from authentik.lib.config import CONFIG
 33from authentik.lib.models import DomainlessURLValidator
 34from authentik.lib.sync.incoming.models import IncomingSyncSource
 35from authentik.lib.utils.time import fqdn_rand
 36from authentik.tasks.schedules.common import ScheduleSpec
 37
 38LDAP_TIMEOUT = 15
 39LDAP_UNIQUENESS = "ldap_uniq"
 40"""Deprecated, don't use"""
 41LDAP_DISTINGUISHED_NAME = "distinguishedName"
 42LOGGER = get_logger()
 43
 44
 45def flatten(value: Any) -> Any:
 46    """Flatten `value` if its a list, set or tuple"""
 47    if isinstance(value, list | set | tuple):
 48        if len(value) < 1:
 49            return None
 50        if isinstance(value, set):
 51            return value.pop()
 52        return value[0]
 53    return value
 54
 55
 56class MultiURLValidator(DomainlessURLValidator):
 57    """Same as DomainlessURLValidator but supports multiple URLs separated with a comma."""
 58
 59    def __call__(self, value: str):
 60        if "," in value:
 61            for url in value.split(","):
 62                super().__call__(url)
 63        else:
 64            super().__call__(value)
 65
 66
 67class LDAPSource(IncomingSyncSource):
 68    """Federate LDAP Directory with authentik, or create new accounts in LDAP."""
 69
 70    server_uri = models.TextField(
 71        validators=[MultiURLValidator(schemes=["ldap", "ldaps"])],
 72        verbose_name=_("Server URI"),
 73    )
 74    peer_certificate = models.ForeignKey(
 75        CertificateKeyPair,
 76        on_delete=models.SET_DEFAULT,
 77        default=None,
 78        null=True,
 79        related_name="ldap_peer_certificates",
 80        help_text=_(
 81            "Optionally verify the LDAP Server's Certificate against the CA Chain in this keypair."
 82        ),
 83    )
 84    client_certificate = models.ForeignKey(
 85        CertificateKeyPair,
 86        on_delete=models.SET_DEFAULT,
 87        default=None,
 88        null=True,
 89        related_name="ldap_client_certificates",
 90        help_text=_("Client certificate to authenticate against the LDAP Server's Certificate."),
 91    )
 92
 93    bind_cn = models.TextField(verbose_name=_("Bind CN"), blank=True)
 94    bind_password = models.TextField(blank=True)
 95    start_tls = models.BooleanField(default=False, verbose_name=_("Enable Start TLS"))
 96    sni = models.BooleanField(default=False, verbose_name=_("Use Server URI for SNI verification"))
 97
 98    base_dn = models.TextField(verbose_name=_("Base DN"))
 99    additional_user_dn = models.TextField(
100        help_text=_("Prepended to Base DN for User-queries."),
101        verbose_name=_("Addition User DN"),
102        blank=True,
103    )
104    additional_group_dn = models.TextField(
105        help_text=_("Prepended to Base DN for Group-queries."),
106        verbose_name=_("Addition Group DN"),
107        blank=True,
108    )
109
110    user_object_filter = models.TextField(
111        default="(objectClass=person)",
112        help_text=_("Consider Objects matching this filter to be Users."),
113    )
114    user_membership_attribute = models.TextField(
115        default=LDAP_DISTINGUISHED_NAME,
116        help_text=_("Attribute which matches the value of `group_membership_field`."),
117    )
118    group_membership_field = models.TextField(
119        default="member", help_text=_("Field which contains members of a group.")
120    )
121    group_object_filter = models.TextField(
122        default="(objectClass=group)",
123        help_text=_("Consider Objects matching this filter to be Groups."),
124    )
125    object_uniqueness_field = models.TextField(
126        default="objectSid", help_text=_("Field which contains a unique Identifier.")
127    )
128
129    password_login_update_internal_password = models.BooleanField(
130        default=False,
131        help_text=_("Update internal authentik password when login succeeds with LDAP"),
132    )
133
134    sync_users = models.BooleanField(default=True)
135    sync_users_password = models.BooleanField(
136        default=True,
137        help_text=_(
138            "When a user changes their password, sync it back to LDAP. "
139            "This can only be enabled on a single LDAP source."
140        ),
141    )
142    sync_groups = models.BooleanField(default=True)
143    sync_parent_group = models.ForeignKey(
144        Group, blank=True, null=True, default=None, on_delete=models.SET_DEFAULT
145    )
146
147    lookup_groups_from_user = models.BooleanField(
148        default=False,
149        help_text=_(
150            "Lookup group membership based on a user attribute instead of a group attribute. "
151            "This allows nested group resolution on systems like FreeIPA and Active Directory"
152        ),
153    )
154
155    delete_not_found_objects = models.BooleanField(
156        default=False,
157        help_text=_(
158            "Delete authentik users and groups which were previously supplied by this source, "
159            "but are now missing from it."
160        ),
161    )
162
163    @property
164    def component(self) -> str:
165        return "ak-source-ldap-form"
166
167    @property
168    def serializer(self) -> type[Serializer]:
169        from authentik.sources.ldap.api.sources import LDAPSourceSerializer
170
171        return LDAPSourceSerializer
172
173    @property
174    def schedule_specs(self) -> list[ScheduleSpec]:
175        from authentik.sources.ldap.tasks import ldap_connectivity_check, ldap_sync
176
177        return [
178            ScheduleSpec(
179                actor=ldap_sync,
180                uid=self.slug,
181                args=(self.pk,),
182                crontab=f"{fqdn_rand('ldap_sync/' + str(self.pk))} */2 * * *",
183                send_on_save=True,
184            ),
185            ScheduleSpec(
186                actor=ldap_connectivity_check,
187                uid=self.slug,
188                args=(self.pk,),
189                crontab=f"{fqdn_rand('ldap_connectivity_check/' + str(self.pk))} * * * *",
190                send_on_save=True,
191            ),
192        ]
193
194    @property
195    def property_mapping_type(self) -> type[PropertyMapping]:
196        from authentik.sources.ldap.models import LDAPSourcePropertyMapping
197
198        return LDAPSourcePropertyMapping
199
200    def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs):
201        properties.setdefault("attributes", {})[LDAP_DISTINGUISHED_NAME] = dn
202        # TODO: Remove after 2026.5, still stored for legacy
203        if self.object_uniqueness_field in ldap:
204            properties["attributes"][LDAP_UNIQUENESS] = flatten(
205                ldap.get(self.object_uniqueness_field)
206            )
207        return properties
208
209    def get_base_user_properties(self, **kwargs):
210        return self.update_properties_with_uniqueness_field({}, **kwargs)
211
212    def get_base_group_properties(self, **kwargs):
213        return self.update_properties_with_uniqueness_field(
214            {
215                "parent": self.sync_parent_group,
216            },
217            **kwargs,
218        )
219
220    @property
221    def icon_url(self) -> str:
222        return static("authentik/sources/ldap.png")
223
224    def server(self, **kwargs) -> ServerPool:
225        """Get LDAP Server/ServerPool"""
226        servers = []
227        tls_kwargs = {}
228        if self.peer_certificate:
229            tls_kwargs["ca_certs_data"] = self.peer_certificate.certificate_data
230            tls_kwargs["validate"] = CERT_REQUIRED
231        if self.client_certificate:
232            temp_dir = mkdtemp()
233            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_cert:
234                temp_cert.write(self.client_certificate.certificate_data)
235                certificate_file = temp_cert.name
236                chmod(certificate_file, 0o600)
237            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_key:
238                temp_key.write(self.client_certificate.key_data)
239                private_key_file = temp_key.name
240                chmod(private_key_file, 0o600)
241            tls_kwargs["local_private_key_file"] = private_key_file
242            tls_kwargs["local_certificate_file"] = certificate_file
243        if ciphers := CONFIG.get("ldap.tls.ciphers", None):
244            tls_kwargs["ciphers"] = ciphers.strip()
245        if self.sni:
246            tls_kwargs["sni"] = self.server_uri.split(",", maxsplit=1)[0].strip()
247        server_kwargs = {
248            "get_info": ALL,
249            "connect_timeout": LDAP_TIMEOUT,
250            "tls": Tls(**tls_kwargs),
251        }
252        server_kwargs.update(kwargs)
253        if "," in self.server_uri:
254            for server in self.server_uri.split(","):
255                servers.append(Server(server, **server_kwargs))
256        else:
257            servers = [Server(self.server_uri, **server_kwargs)]
258        return ServerPool(servers, RANDOM, active=5, exhaust=True)
259
260    def connection(
261        self,
262        server: Server | None = None,
263        server_kwargs: dict | None = None,
264        connection_kwargs: dict | None = None,
265    ) -> Connection:
266        """Get a fully connected and bound LDAP Connection"""
267        server_kwargs = server_kwargs or {}
268        connection_kwargs = connection_kwargs or {}
269        if self.bind_cn is not None:
270            connection_kwargs.setdefault("user", self.bind_cn)
271        if self.bind_password is not None:
272            connection_kwargs.setdefault("password", self.bind_password)
273        conn = Connection(
274            server or self.server(**server_kwargs),
275            raise_exceptions=True,
276            receive_timeout=LDAP_TIMEOUT,
277            **connection_kwargs,
278        )
279
280        if self.start_tls:
281            LOGGER.debug("Connection StartTLS", source=self)
282            conn.start_tls(read_server_info=False)
283        try:
284            successful = conn.bind()
285            if successful:
286                return conn
287        except (
288            LDAPSchemaError,
289            LDAPInsufficientAccessRightsResult,
290            LDAPAdminLimitExceededResult,
291            LDAPAttributeError,
292        ) as exc:
293            # Schema error or rate limit during schema fetch, retry without schema info
294            # See https://github.com/goauthentik/authentik/issues/4590
295            # See also https://github.com/goauthentik/authentik/issues/3399
296            # LDAPAdminLimitExceededResult: Google Secure LDAP rate-limits schema queries
297            # LDAPAttributeError: Google Secure LDAP returns unsupported attrs in schema
298            if server_kwargs.get("get_info", ALL) == NONE:
299                LOGGER.warning("Failed to connect after schema downgrade", source=self, exc=exc)
300                raise exc
301            LOGGER.warning("Downgrading connection to no schema info", source=self, exc=exc)
302            server_kwargs["get_info"] = NONE
303            return self.connection(server, server_kwargs, connection_kwargs)
304        finally:
305            if conn.server.tls.certificate_file is not None and exists(
306                conn.server.tls.certificate_file
307            ):
308                rmtree(dirname(conn.server.tls.certificate_file))
309        return RuntimeError("Failed to bind")
310
311    @property
312    def sync_lock(self) -> pglock.advisory:
313        """Postgres lock for syncing LDAP to prevent multiple parallel syncs happening"""
314        return pglock.advisory(
315            lock_id=f"goauthentik.io/{connection.schema_name}/sources/ldap/sync/{self.slug}",
316            timeout=0,
317            side_effect=pglock.Return,
318        )
319
320    def get_ldap_server_info(self, srv: Server) -> dict[str, str]:
321        info = {
322            "vendor": _("N/A"),
323            "version": _("N/A"),
324        }
325        if srv.info:
326            info["vendor"] = str(flatten(srv.info.vendor_name))
327            info["version"] = str(flatten(srv.info.vendor_version))
328        return info
329
330    def check_connection(self) -> dict[str, dict[str, str]]:
331        """Check LDAP Connection"""
332        servers = self.server()
333        server_info = {}
334        # Check each individual server
335        for server in servers.servers:
336            server: Server
337            try:
338                conn = self.connection(server=server)
339                server_info[server.host] = {
340                    "status": "ok",
341                    **self.get_ldap_server_info(conn.server),
342                }
343            except LDAPException as exc:
344                server_info[server.host] = {
345                    "status": str(exc),
346                }
347        # Check server pool
348        try:
349            conn = self.connection()
350            server_info["__all__"] = {
351                "status": "ok",
352                **self.get_ldap_server_info(conn.server),
353            }
354        except LDAPException as exc:
355            server_info["__all__"] = {
356                "status": str(exc),
357            }
358        return server_info
359
360    class Meta:
361        verbose_name = _("LDAP Source")
362        verbose_name_plural = _("LDAP Sources")
363
364
365class LDAPSourcePropertyMapping(PropertyMapping):
366    """Map LDAP Property to User or Group object attribute"""
367
368    @property
369    def component(self) -> str:
370        return "ak-property-mapping-source-ldap-form"
371
372    @property
373    def serializer(self) -> type[Serializer]:
374        from authentik.sources.ldap.api.property_mappings import LDAPSourcePropertyMappingSerializer
375
376        return LDAPSourcePropertyMappingSerializer
377
378    def __str__(self):
379        return str(self.name)
380
381    class Meta:
382        verbose_name = _("LDAP Source Property Mapping")
383        verbose_name_plural = _("LDAP Source Property Mappings")
384
385
386class UserLDAPSourceConnection(UserSourceConnection):
387    validated_by = models.UUIDField(
388        null=True,
389        blank=True,
390        help_text=_("Unique ID used while checking if this object still exists in the directory."),
391    )
392
393    @property
394    def serializer(self) -> type[Serializer]:
395        from authentik.sources.ldap.api.connections import (
396            UserLDAPSourceConnectionSerializer,
397        )
398
399        return UserLDAPSourceConnectionSerializer
400
401    class Meta:
402        verbose_name = _("User LDAP Source Connection")
403        verbose_name_plural = _("User LDAP Source Connections")
404        indexes = [
405            models.Index(fields=["validated_by"]),
406        ]
407
408
409class GroupLDAPSourceConnection(GroupSourceConnection):
410    validated_by = models.UUIDField(
411        null=True,
412        blank=True,
413        help_text=_("Unique ID used while checking if this object still exists in the directory."),
414    )
415
416    @property
417    def serializer(self) -> type[Serializer]:
418        from authentik.sources.ldap.api.connections import (
419            GroupLDAPSourceConnectionSerializer,
420        )
421
422        return GroupLDAPSourceConnectionSerializer
423
424    class Meta:
425        verbose_name = _("Group LDAP Source Connection")
426        verbose_name_plural = _("Group LDAP Source Connections")
427        indexes = [
428            models.Index(fields=["validated_by"]),
429        ]
LDAP_TIMEOUT = 15
LDAP_UNIQUENESS = 'ldap_uniq'

Deprecated, don't use

LDAP_DISTINGUISHED_NAME = 'distinguishedName'
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def flatten(value: Any) -> Any:
46def flatten(value: Any) -> Any:
47    """Flatten `value` if its a list, set or tuple"""
48    if isinstance(value, list | set | tuple):
49        if len(value) < 1:
50            return None
51        if isinstance(value, set):
52            return value.pop()
53        return value[0]
54    return value

Flatten value if its a list, set or tuple

class MultiURLValidator(authentik.lib.models.DomainlessURLValidator):
57class MultiURLValidator(DomainlessURLValidator):
58    """Same as DomainlessURLValidator but supports multiple URLs separated with a comma."""
59
60    def __call__(self, value: str):
61        if "," in value:
62            for url in value.split(","):
63                super().__call__(url)
64        else:
65            super().__call__(value)

Same as DomainlessURLValidator but supports multiple URLs separated with a comma.

 68class LDAPSource(IncomingSyncSource):
 69    """Federate LDAP Directory with authentik, or create new accounts in LDAP."""
 70
 71    server_uri = models.TextField(
 72        validators=[MultiURLValidator(schemes=["ldap", "ldaps"])],
 73        verbose_name=_("Server URI"),
 74    )
 75    peer_certificate = models.ForeignKey(
 76        CertificateKeyPair,
 77        on_delete=models.SET_DEFAULT,
 78        default=None,
 79        null=True,
 80        related_name="ldap_peer_certificates",
 81        help_text=_(
 82            "Optionally verify the LDAP Server's Certificate against the CA Chain in this keypair."
 83        ),
 84    )
 85    client_certificate = models.ForeignKey(
 86        CertificateKeyPair,
 87        on_delete=models.SET_DEFAULT,
 88        default=None,
 89        null=True,
 90        related_name="ldap_client_certificates",
 91        help_text=_("Client certificate to authenticate against the LDAP Server's Certificate."),
 92    )
 93
 94    bind_cn = models.TextField(verbose_name=_("Bind CN"), blank=True)
 95    bind_password = models.TextField(blank=True)
 96    start_tls = models.BooleanField(default=False, verbose_name=_("Enable Start TLS"))
 97    sni = models.BooleanField(default=False, verbose_name=_("Use Server URI for SNI verification"))
 98
 99    base_dn = models.TextField(verbose_name=_("Base DN"))
100    additional_user_dn = models.TextField(
101        help_text=_("Prepended to Base DN for User-queries."),
102        verbose_name=_("Addition User DN"),
103        blank=True,
104    )
105    additional_group_dn = models.TextField(
106        help_text=_("Prepended to Base DN for Group-queries."),
107        verbose_name=_("Addition Group DN"),
108        blank=True,
109    )
110
111    user_object_filter = models.TextField(
112        default="(objectClass=person)",
113        help_text=_("Consider Objects matching this filter to be Users."),
114    )
115    user_membership_attribute = models.TextField(
116        default=LDAP_DISTINGUISHED_NAME,
117        help_text=_("Attribute which matches the value of `group_membership_field`."),
118    )
119    group_membership_field = models.TextField(
120        default="member", help_text=_("Field which contains members of a group.")
121    )
122    group_object_filter = models.TextField(
123        default="(objectClass=group)",
124        help_text=_("Consider Objects matching this filter to be Groups."),
125    )
126    object_uniqueness_field = models.TextField(
127        default="objectSid", help_text=_("Field which contains a unique Identifier.")
128    )
129
130    password_login_update_internal_password = models.BooleanField(
131        default=False,
132        help_text=_("Update internal authentik password when login succeeds with LDAP"),
133    )
134
135    sync_users = models.BooleanField(default=True)
136    sync_users_password = models.BooleanField(
137        default=True,
138        help_text=_(
139            "When a user changes their password, sync it back to LDAP. "
140            "This can only be enabled on a single LDAP source."
141        ),
142    )
143    sync_groups = models.BooleanField(default=True)
144    sync_parent_group = models.ForeignKey(
145        Group, blank=True, null=True, default=None, on_delete=models.SET_DEFAULT
146    )
147
148    lookup_groups_from_user = models.BooleanField(
149        default=False,
150        help_text=_(
151            "Lookup group membership based on a user attribute instead of a group attribute. "
152            "This allows nested group resolution on systems like FreeIPA and Active Directory"
153        ),
154    )
155
156    delete_not_found_objects = models.BooleanField(
157        default=False,
158        help_text=_(
159            "Delete authentik users and groups which were previously supplied by this source, "
160            "but are now missing from it."
161        ),
162    )
163
164    @property
165    def component(self) -> str:
166        return "ak-source-ldap-form"
167
168    @property
169    def serializer(self) -> type[Serializer]:
170        from authentik.sources.ldap.api.sources import LDAPSourceSerializer
171
172        return LDAPSourceSerializer
173
174    @property
175    def schedule_specs(self) -> list[ScheduleSpec]:
176        from authentik.sources.ldap.tasks import ldap_connectivity_check, ldap_sync
177
178        return [
179            ScheduleSpec(
180                actor=ldap_sync,
181                uid=self.slug,
182                args=(self.pk,),
183                crontab=f"{fqdn_rand('ldap_sync/' + str(self.pk))} */2 * * *",
184                send_on_save=True,
185            ),
186            ScheduleSpec(
187                actor=ldap_connectivity_check,
188                uid=self.slug,
189                args=(self.pk,),
190                crontab=f"{fqdn_rand('ldap_connectivity_check/' + str(self.pk))} * * * *",
191                send_on_save=True,
192            ),
193        ]
194
195    @property
196    def property_mapping_type(self) -> type[PropertyMapping]:
197        from authentik.sources.ldap.models import LDAPSourcePropertyMapping
198
199        return LDAPSourcePropertyMapping
200
201    def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs):
202        properties.setdefault("attributes", {})[LDAP_DISTINGUISHED_NAME] = dn
203        # TODO: Remove after 2026.5, still stored for legacy
204        if self.object_uniqueness_field in ldap:
205            properties["attributes"][LDAP_UNIQUENESS] = flatten(
206                ldap.get(self.object_uniqueness_field)
207            )
208        return properties
209
210    def get_base_user_properties(self, **kwargs):
211        return self.update_properties_with_uniqueness_field({}, **kwargs)
212
213    def get_base_group_properties(self, **kwargs):
214        return self.update_properties_with_uniqueness_field(
215            {
216                "parent": self.sync_parent_group,
217            },
218            **kwargs,
219        )
220
221    @property
222    def icon_url(self) -> str:
223        return static("authentik/sources/ldap.png")
224
225    def server(self, **kwargs) -> ServerPool:
226        """Get LDAP Server/ServerPool"""
227        servers = []
228        tls_kwargs = {}
229        if self.peer_certificate:
230            tls_kwargs["ca_certs_data"] = self.peer_certificate.certificate_data
231            tls_kwargs["validate"] = CERT_REQUIRED
232        if self.client_certificate:
233            temp_dir = mkdtemp()
234            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_cert:
235                temp_cert.write(self.client_certificate.certificate_data)
236                certificate_file = temp_cert.name
237                chmod(certificate_file, 0o600)
238            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_key:
239                temp_key.write(self.client_certificate.key_data)
240                private_key_file = temp_key.name
241                chmod(private_key_file, 0o600)
242            tls_kwargs["local_private_key_file"] = private_key_file
243            tls_kwargs["local_certificate_file"] = certificate_file
244        if ciphers := CONFIG.get("ldap.tls.ciphers", None):
245            tls_kwargs["ciphers"] = ciphers.strip()
246        if self.sni:
247            tls_kwargs["sni"] = self.server_uri.split(",", maxsplit=1)[0].strip()
248        server_kwargs = {
249            "get_info": ALL,
250            "connect_timeout": LDAP_TIMEOUT,
251            "tls": Tls(**tls_kwargs),
252        }
253        server_kwargs.update(kwargs)
254        if "," in self.server_uri:
255            for server in self.server_uri.split(","):
256                servers.append(Server(server, **server_kwargs))
257        else:
258            servers = [Server(self.server_uri, **server_kwargs)]
259        return ServerPool(servers, RANDOM, active=5, exhaust=True)
260
261    def connection(
262        self,
263        server: Server | None = None,
264        server_kwargs: dict | None = None,
265        connection_kwargs: dict | None = None,
266    ) -> Connection:
267        """Get a fully connected and bound LDAP Connection"""
268        server_kwargs = server_kwargs or {}
269        connection_kwargs = connection_kwargs or {}
270        if self.bind_cn is not None:
271            connection_kwargs.setdefault("user", self.bind_cn)
272        if self.bind_password is not None:
273            connection_kwargs.setdefault("password", self.bind_password)
274        conn = Connection(
275            server or self.server(**server_kwargs),
276            raise_exceptions=True,
277            receive_timeout=LDAP_TIMEOUT,
278            **connection_kwargs,
279        )
280
281        if self.start_tls:
282            LOGGER.debug("Connection StartTLS", source=self)
283            conn.start_tls(read_server_info=False)
284        try:
285            successful = conn.bind()
286            if successful:
287                return conn
288        except (
289            LDAPSchemaError,
290            LDAPInsufficientAccessRightsResult,
291            LDAPAdminLimitExceededResult,
292            LDAPAttributeError,
293        ) as exc:
294            # Schema error or rate limit during schema fetch, retry without schema info
295            # See https://github.com/goauthentik/authentik/issues/4590
296            # See also https://github.com/goauthentik/authentik/issues/3399
297            # LDAPAdminLimitExceededResult: Google Secure LDAP rate-limits schema queries
298            # LDAPAttributeError: Google Secure LDAP returns unsupported attrs in schema
299            if server_kwargs.get("get_info", ALL) == NONE:
300                LOGGER.warning("Failed to connect after schema downgrade", source=self, exc=exc)
301                raise exc
302            LOGGER.warning("Downgrading connection to no schema info", source=self, exc=exc)
303            server_kwargs["get_info"] = NONE
304            return self.connection(server, server_kwargs, connection_kwargs)
305        finally:
306            if conn.server.tls.certificate_file is not None and exists(
307                conn.server.tls.certificate_file
308            ):
309                rmtree(dirname(conn.server.tls.certificate_file))
310        return RuntimeError("Failed to bind")
311
312    @property
313    def sync_lock(self) -> pglock.advisory:
314        """Postgres lock for syncing LDAP to prevent multiple parallel syncs happening"""
315        return pglock.advisory(
316            lock_id=f"goauthentik.io/{connection.schema_name}/sources/ldap/sync/{self.slug}",
317            timeout=0,
318            side_effect=pglock.Return,
319        )
320
321    def get_ldap_server_info(self, srv: Server) -> dict[str, str]:
322        info = {
323            "vendor": _("N/A"),
324            "version": _("N/A"),
325        }
326        if srv.info:
327            info["vendor"] = str(flatten(srv.info.vendor_name))
328            info["version"] = str(flatten(srv.info.vendor_version))
329        return info
330
331    def check_connection(self) -> dict[str, dict[str, str]]:
332        """Check LDAP Connection"""
333        servers = self.server()
334        server_info = {}
335        # Check each individual server
336        for server in servers.servers:
337            server: Server
338            try:
339                conn = self.connection(server=server)
340                server_info[server.host] = {
341                    "status": "ok",
342                    **self.get_ldap_server_info(conn.server),
343                }
344            except LDAPException as exc:
345                server_info[server.host] = {
346                    "status": str(exc),
347                }
348        # Check server pool
349        try:
350            conn = self.connection()
351            server_info["__all__"] = {
352                "status": "ok",
353                **self.get_ldap_server_info(conn.server),
354            }
355        except LDAPException as exc:
356            server_info["__all__"] = {
357                "status": str(exc),
358            }
359        return server_info
360
361    class Meta:
362        verbose_name = _("LDAP Source")
363        verbose_name_plural = _("LDAP Sources")

Federate LDAP Directory with authentik, or create new accounts in LDAP.

def server_uri(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

peer_certificate

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

client_certificate

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def bind_cn(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def bind_password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def start_tls(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sni(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def base_dn(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def additional_user_dn(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def additional_group_dn(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def user_object_filter(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def user_membership_attribute(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def group_membership_field(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def group_object_filter(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def object_uniqueness_field(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def password_login_update_internal_password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_users(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_users_password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_groups(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

sync_parent_group

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def lookup_groups_from_user(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def delete_not_found_objects(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

component: str
164    @property
165    def component(self) -> str:
166        return "ak-source-ldap-form"

Return component used to edit this object

serializer: type[rest_framework.serializers.Serializer]
168    @property
169    def serializer(self) -> type[Serializer]:
170        from authentik.sources.ldap.api.sources import LDAPSourceSerializer
171
172        return LDAPSourceSerializer

Get serializer for this model

schedule_specs: list[authentik.tasks.schedules.common.ScheduleSpec]
174    @property
175    def schedule_specs(self) -> list[ScheduleSpec]:
176        from authentik.sources.ldap.tasks import ldap_connectivity_check, ldap_sync
177
178        return [
179            ScheduleSpec(
180                actor=ldap_sync,
181                uid=self.slug,
182                args=(self.pk,),
183                crontab=f"{fqdn_rand('ldap_sync/' + str(self.pk))} */2 * * *",
184                send_on_save=True,
185            ),
186            ScheduleSpec(
187                actor=ldap_connectivity_check,
188                uid=self.slug,
189                args=(self.pk,),
190                crontab=f"{fqdn_rand('ldap_connectivity_check/' + str(self.pk))} * * * *",
191                send_on_save=True,
192            ),
193        ]
property_mapping_type: type[authentik.core.models.PropertyMapping]
195    @property
196    def property_mapping_type(self) -> type[PropertyMapping]:
197        from authentik.sources.ldap.models import LDAPSourcePropertyMapping
198
199        return LDAPSourcePropertyMapping

Return property mapping type used by this object

def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs):
201    def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs):
202        properties.setdefault("attributes", {})[LDAP_DISTINGUISHED_NAME] = dn
203        # TODO: Remove after 2026.5, still stored for legacy
204        if self.object_uniqueness_field in ldap:
205            properties["attributes"][LDAP_UNIQUENESS] = flatten(
206                ldap.get(self.object_uniqueness_field)
207            )
208        return properties
def get_base_user_properties(self, **kwargs):
210    def get_base_user_properties(self, **kwargs):
211        return self.update_properties_with_uniqueness_field({}, **kwargs)

Get base properties for a user to build final properties upon.

def get_base_group_properties(self, **kwargs):
213    def get_base_group_properties(self, **kwargs):
214        return self.update_properties_with_uniqueness_field(
215            {
216                "parent": self.sync_parent_group,
217            },
218            **kwargs,
219        )

Get base properties for a group to build final properties upon.

icon_url: str
221    @property
222    def icon_url(self) -> str:
223        return static("authentik/sources/ldap.png")

Get the URL to the source icon

def server(self, **kwargs) -> ldap3.core.pooling.ServerPool:
225    def server(self, **kwargs) -> ServerPool:
226        """Get LDAP Server/ServerPool"""
227        servers = []
228        tls_kwargs = {}
229        if self.peer_certificate:
230            tls_kwargs["ca_certs_data"] = self.peer_certificate.certificate_data
231            tls_kwargs["validate"] = CERT_REQUIRED
232        if self.client_certificate:
233            temp_dir = mkdtemp()
234            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_cert:
235                temp_cert.write(self.client_certificate.certificate_data)
236                certificate_file = temp_cert.name
237                chmod(certificate_file, 0o600)
238            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_key:
239                temp_key.write(self.client_certificate.key_data)
240                private_key_file = temp_key.name
241                chmod(private_key_file, 0o600)
242            tls_kwargs["local_private_key_file"] = private_key_file
243            tls_kwargs["local_certificate_file"] = certificate_file
244        if ciphers := CONFIG.get("ldap.tls.ciphers", None):
245            tls_kwargs["ciphers"] = ciphers.strip()
246        if self.sni:
247            tls_kwargs["sni"] = self.server_uri.split(",", maxsplit=1)[0].strip()
248        server_kwargs = {
249            "get_info": ALL,
250            "connect_timeout": LDAP_TIMEOUT,
251            "tls": Tls(**tls_kwargs),
252        }
253        server_kwargs.update(kwargs)
254        if "," in self.server_uri:
255            for server in self.server_uri.split(","):
256                servers.append(Server(server, **server_kwargs))
257        else:
258            servers = [Server(self.server_uri, **server_kwargs)]
259        return ServerPool(servers, RANDOM, active=5, exhaust=True)

Get LDAP Server/ServerPool

def connection( self, server: ldap3.core.server.Server | None = None, server_kwargs: dict | None = None, connection_kwargs: dict | None = None) -> ldap3.core.connection.Connection:
261    def connection(
262        self,
263        server: Server | None = None,
264        server_kwargs: dict | None = None,
265        connection_kwargs: dict | None = None,
266    ) -> Connection:
267        """Get a fully connected and bound LDAP Connection"""
268        server_kwargs = server_kwargs or {}
269        connection_kwargs = connection_kwargs or {}
270        if self.bind_cn is not None:
271            connection_kwargs.setdefault("user", self.bind_cn)
272        if self.bind_password is not None:
273            connection_kwargs.setdefault("password", self.bind_password)
274        conn = Connection(
275            server or self.server(**server_kwargs),
276            raise_exceptions=True,
277            receive_timeout=LDAP_TIMEOUT,
278            **connection_kwargs,
279        )
280
281        if self.start_tls:
282            LOGGER.debug("Connection StartTLS", source=self)
283            conn.start_tls(read_server_info=False)
284        try:
285            successful = conn.bind()
286            if successful:
287                return conn
288        except (
289            LDAPSchemaError,
290            LDAPInsufficientAccessRightsResult,
291            LDAPAdminLimitExceededResult,
292            LDAPAttributeError,
293        ) as exc:
294            # Schema error or rate limit during schema fetch, retry without schema info
295            # See https://github.com/goauthentik/authentik/issues/4590
296            # See also https://github.com/goauthentik/authentik/issues/3399
297            # LDAPAdminLimitExceededResult: Google Secure LDAP rate-limits schema queries
298            # LDAPAttributeError: Google Secure LDAP returns unsupported attrs in schema
299            if server_kwargs.get("get_info", ALL) == NONE:
300                LOGGER.warning("Failed to connect after schema downgrade", source=self, exc=exc)
301                raise exc
302            LOGGER.warning("Downgrading connection to no schema info", source=self, exc=exc)
303            server_kwargs["get_info"] = NONE
304            return self.connection(server, server_kwargs, connection_kwargs)
305        finally:
306            if conn.server.tls.certificate_file is not None and exists(
307                conn.server.tls.certificate_file
308            ):
309                rmtree(dirname(conn.server.tls.certificate_file))
310        return RuntimeError("Failed to bind")

Get a fully connected and bound LDAP Connection

sync_lock: pglock.core.advisory
312    @property
313    def sync_lock(self) -> pglock.advisory:
314        """Postgres lock for syncing LDAP to prevent multiple parallel syncs happening"""
315        return pglock.advisory(
316            lock_id=f"goauthentik.io/{connection.schema_name}/sources/ldap/sync/{self.slug}",
317            timeout=0,
318            side_effect=pglock.Return,
319        )

Postgres lock for syncing LDAP to prevent multiple parallel syncs happening

def get_ldap_server_info(self, srv: ldap3.core.server.Server) -> dict[str, str]:
321    def get_ldap_server_info(self, srv: Server) -> dict[str, str]:
322        info = {
323            "vendor": _("N/A"),
324            "version": _("N/A"),
325        }
326        if srv.info:
327            info["vendor"] = str(flatten(srv.info.vendor_name))
328            info["version"] = str(flatten(srv.info.vendor_version))
329        return info
def check_connection(self) -> dict[str, dict[str, str]]:
331    def check_connection(self) -> dict[str, dict[str, str]]:
332        """Check LDAP Connection"""
333        servers = self.server()
334        server_info = {}
335        # Check each individual server
336        for server in servers.servers:
337            server: Server
338            try:
339                conn = self.connection(server=server)
340                server_info[server.host] = {
341                    "status": "ok",
342                    **self.get_ldap_server_info(conn.server),
343                }
344            except LDAPException as exc:
345                server_info[server.host] = {
346                    "status": str(exc),
347                }
348        # Check server pool
349        try:
350            conn = self.connection()
351            server_info["__all__"] = {
352                "status": "ok",
353                **self.get_ldap_server_info(conn.server),
354            }
355        except LDAPException as exc:
356            server_info["__all__"] = {
357                "status": str(exc),
358            }
359        return server_info

Check LDAP Connection

def sync_outgoing_trigger_mode(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

schedules

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

tasks

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

peer_certificate_id
client_certificate_id
sync_parent_group_id
source_ptr_id
source_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

def get_sync_outgoing_trigger_mode_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

class LDAPSource.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class LDAPSource.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class LDAPSourcePropertyMapping(authentik.core.models.PropertyMapping):
366class LDAPSourcePropertyMapping(PropertyMapping):
367    """Map LDAP Property to User or Group object attribute"""
368
369    @property
370    def component(self) -> str:
371        return "ak-property-mapping-source-ldap-form"
372
373    @property
374    def serializer(self) -> type[Serializer]:
375        from authentik.sources.ldap.api.property_mappings import LDAPSourcePropertyMappingSerializer
376
377        return LDAPSourcePropertyMappingSerializer
378
379    def __str__(self):
380        return str(self.name)
381
382    class Meta:
383        verbose_name = _("LDAP Source Property Mapping")
384        verbose_name_plural = _("LDAP Source Property Mappings")

Map LDAP Property to User or Group object attribute

component: str
369    @property
370    def component(self) -> str:
371        return "ak-property-mapping-source-ldap-form"

Return component used to edit this object

serializer: type[rest_framework.serializers.Serializer]
373    @property
374    def serializer(self) -> type[Serializer]:
375        from authentik.sources.ldap.api.property_mappings import LDAPSourcePropertyMappingSerializer
376
377        return LDAPSourcePropertyMappingSerializer

Get serializer for this model

propertymapping_ptr_id
propertymapping_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class LDAPSourcePropertyMapping.DoesNotExist(authentik.core.models.PropertyMapping.DoesNotExist):

The requested object does not exist

class LDAPSourcePropertyMapping.MultipleObjectsReturned(authentik.core.models.PropertyMapping.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class UserLDAPSourceConnection(authentik.core.models.UserSourceConnection):
387class UserLDAPSourceConnection(UserSourceConnection):
388    validated_by = models.UUIDField(
389        null=True,
390        blank=True,
391        help_text=_("Unique ID used while checking if this object still exists in the directory."),
392    )
393
394    @property
395    def serializer(self) -> type[Serializer]:
396        from authentik.sources.ldap.api.connections import (
397            UserLDAPSourceConnectionSerializer,
398        )
399
400        return UserLDAPSourceConnectionSerializer
401
402    class Meta:
403        verbose_name = _("User LDAP Source Connection")
404        verbose_name_plural = _("User LDAP Source Connections")
405        indexes = [
406            models.Index(fields=["validated_by"]),
407        ]

UserLDAPSourceConnection(id, created, last_updated, user, source, identifier, usersourceconnection_ptr, validated_by)

def validated_by(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.Serializer]
394    @property
395    def serializer(self) -> type[Serializer]:
396        from authentik.sources.ldap.api.connections import (
397            UserLDAPSourceConnectionSerializer,
398        )
399
400        return UserLDAPSourceConnectionSerializer

Get serializer for this model

usersourceconnection_ptr_id
usersourceconnection_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class UserLDAPSourceConnection.DoesNotExist(authentik.core.models.UserSourceConnection.DoesNotExist):

The requested object does not exist

class UserLDAPSourceConnection.MultipleObjectsReturned(authentik.core.models.UserSourceConnection.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class GroupLDAPSourceConnection(authentik.core.models.GroupSourceConnection):
410class GroupLDAPSourceConnection(GroupSourceConnection):
411    validated_by = models.UUIDField(
412        null=True,
413        blank=True,
414        help_text=_("Unique ID used while checking if this object still exists in the directory."),
415    )
416
417    @property
418    def serializer(self) -> type[Serializer]:
419        from authentik.sources.ldap.api.connections import (
420            GroupLDAPSourceConnectionSerializer,
421        )
422
423        return GroupLDAPSourceConnectionSerializer
424
425    class Meta:
426        verbose_name = _("Group LDAP Source Connection")
427        verbose_name_plural = _("Group LDAP Source Connections")
428        indexes = [
429            models.Index(fields=["validated_by"]),
430        ]

GroupLDAPSourceConnection(id, created, last_updated, group, source, identifier, groupsourceconnection_ptr, validated_by)

def validated_by(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.Serializer]
417    @property
418    def serializer(self) -> type[Serializer]:
419        from authentik.sources.ldap.api.connections import (
420            GroupLDAPSourceConnectionSerializer,
421        )
422
423        return GroupLDAPSourceConnectionSerializer

Get serializer for this model

groupsourceconnection_ptr_id
groupsourceconnection_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class GroupLDAPSourceConnection.DoesNotExist(authentik.core.models.GroupSourceConnection.DoesNotExist):

The requested object does not exist

class GroupLDAPSourceConnection.MultipleObjectsReturned(authentik.core.models.GroupSourceConnection.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.