authentik.sources.ldap.models

authentik LDAP Models

  1"""authentik LDAP Models"""
  2
  3from os import chmod
  4from os.path import dirname, exists
  5from shutil import rmtree
  6from ssl import CERT_REQUIRED
  7from tempfile import NamedTemporaryFile, mkdtemp
  8from typing import Any
  9
 10import pglock
 11from django.db import connection, models
 12from django.templatetags.static import static
 13from django.utils.translation import gettext_lazy as _
 14from ldap3 import ALL, NONE, RANDOM, Connection, Server, ServerPool, Tls
 15from ldap3.core.exceptions import LDAPException, LDAPInsufficientAccessRightsResult, LDAPSchemaError
 16from rest_framework.serializers import Serializer
 17from structlog.stdlib import get_logger
 18
 19from authentik.core.models import (
 20    Group,
 21    GroupSourceConnection,
 22    PropertyMapping,
 23    UserSourceConnection,
 24)
 25from authentik.crypto.models import CertificateKeyPair
 26from authentik.lib.config import CONFIG
 27from authentik.lib.models import DomainlessURLValidator
 28from authentik.lib.sync.incoming.models import IncomingSyncSource
 29from authentik.lib.utils.time import fqdn_rand
 30from authentik.tasks.schedules.common import ScheduleSpec
 31
 32LDAP_TIMEOUT = 15
 33LDAP_UNIQUENESS = "ldap_uniq"
 34LDAP_DISTINGUISHED_NAME = "distinguishedName"
 35LOGGER = get_logger()
 36
 37
 38def flatten(value: Any) -> Any:
 39    """Flatten `value` if its a list, set or tuple"""
 40    if isinstance(value, list | set | tuple):
 41        if len(value) < 1:
 42            return None
 43        if isinstance(value, set):
 44            return value.pop()
 45        return value[0]
 46    return value
 47
 48
 49class MultiURLValidator(DomainlessURLValidator):
 50    """Same as DomainlessURLValidator but supports multiple URLs separated with a comma."""
 51
 52    def __call__(self, value: str):
 53        if "," in value:
 54            for url in value.split(","):
 55                super().__call__(url)
 56        else:
 57            super().__call__(value)
 58
 59
 60class LDAPSource(IncomingSyncSource):
 61    """Federate LDAP Directory with authentik, or create new accounts in LDAP."""
 62
 63    server_uri = models.TextField(
 64        validators=[MultiURLValidator(schemes=["ldap", "ldaps"])],
 65        verbose_name=_("Server URI"),
 66    )
 67    peer_certificate = models.ForeignKey(
 68        CertificateKeyPair,
 69        on_delete=models.SET_DEFAULT,
 70        default=None,
 71        null=True,
 72        related_name="ldap_peer_certificates",
 73        help_text=_(
 74            "Optionally verify the LDAP Server's Certificate against the CA Chain in this keypair."
 75        ),
 76    )
 77    client_certificate = models.ForeignKey(
 78        CertificateKeyPair,
 79        on_delete=models.SET_DEFAULT,
 80        default=None,
 81        null=True,
 82        related_name="ldap_client_certificates",
 83        help_text=_("Client certificate to authenticate against the LDAP Server's Certificate."),
 84    )
 85
 86    bind_cn = models.TextField(verbose_name=_("Bind CN"), blank=True)
 87    bind_password = models.TextField(blank=True)
 88    start_tls = models.BooleanField(default=False, verbose_name=_("Enable Start TLS"))
 89    sni = models.BooleanField(default=False, verbose_name=_("Use Server URI for SNI verification"))
 90
 91    base_dn = models.TextField(verbose_name=_("Base DN"))
 92    additional_user_dn = models.TextField(
 93        help_text=_("Prepended to Base DN for User-queries."),
 94        verbose_name=_("Addition User DN"),
 95        blank=True,
 96    )
 97    additional_group_dn = models.TextField(
 98        help_text=_("Prepended to Base DN for Group-queries."),
 99        verbose_name=_("Addition Group DN"),
100        blank=True,
101    )
102
103    user_object_filter = models.TextField(
104        default="(objectClass=person)",
105        help_text=_("Consider Objects matching this filter to be Users."),
106    )
107    user_membership_attribute = models.TextField(
108        default=LDAP_DISTINGUISHED_NAME,
109        help_text=_("Attribute which matches the value of `group_membership_field`."),
110    )
111    group_membership_field = models.TextField(
112        default="member", help_text=_("Field which contains members of a group.")
113    )
114    group_object_filter = models.TextField(
115        default="(objectClass=group)",
116        help_text=_("Consider Objects matching this filter to be Groups."),
117    )
118    object_uniqueness_field = models.TextField(
119        default="objectSid", help_text=_("Field which contains a unique Identifier.")
120    )
121
122    password_login_update_internal_password = models.BooleanField(
123        default=False,
124        help_text=_("Update internal authentik password when login succeeds with LDAP"),
125    )
126
127    sync_users = models.BooleanField(default=True)
128    sync_users_password = models.BooleanField(
129        default=True,
130        help_text=_(
131            "When a user changes their password, sync it back to LDAP. "
132            "This can only be enabled on a single LDAP source."
133        ),
134    )
135    sync_groups = models.BooleanField(default=True)
136    sync_parent_group = models.ForeignKey(
137        Group, blank=True, null=True, default=None, on_delete=models.SET_DEFAULT
138    )
139
140    lookup_groups_from_user = models.BooleanField(
141        default=False,
142        help_text=_(
143            "Lookup group membership based on a user attribute instead of a group attribute. "
144            "This allows nested group resolution on systems like FreeIPA and Active Directory"
145        ),
146    )
147
148    delete_not_found_objects = models.BooleanField(
149        default=False,
150        help_text=_(
151            "Delete authentik users and groups which were previously supplied by this source, "
152            "but are now missing from it."
153        ),
154    )
155
156    @property
157    def component(self) -> str:
158        return "ak-source-ldap-form"
159
160    @property
161    def serializer(self) -> type[Serializer]:
162        from authentik.sources.ldap.api import LDAPSourceSerializer
163
164        return LDAPSourceSerializer
165
166    @property
167    def schedule_specs(self) -> list[ScheduleSpec]:
168        from authentik.sources.ldap.tasks import ldap_connectivity_check, ldap_sync
169
170        return [
171            ScheduleSpec(
172                actor=ldap_sync,
173                uid=self.slug,
174                args=(self.pk,),
175                crontab=f"{fqdn_rand('ldap_sync/' + str(self.pk))} */2 * * *",
176                send_on_save=True,
177            ),
178            ScheduleSpec(
179                actor=ldap_connectivity_check,
180                uid=self.slug,
181                args=(self.pk,),
182                crontab=f"{fqdn_rand('ldap_connectivity_check/' + str(self.pk))} * * * *",
183                send_on_save=True,
184            ),
185        ]
186
187    @property
188    def property_mapping_type(self) -> type[PropertyMapping]:
189        from authentik.sources.ldap.models import LDAPSourcePropertyMapping
190
191        return LDAPSourcePropertyMapping
192
193    def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs):
194        properties.setdefault("attributes", {})[LDAP_DISTINGUISHED_NAME] = dn
195        if self.object_uniqueness_field in ldap:
196            properties["attributes"][LDAP_UNIQUENESS] = flatten(
197                ldap.get(self.object_uniqueness_field)
198            )
199        return properties
200
201    def get_base_user_properties(self, **kwargs):
202        return self.update_properties_with_uniqueness_field({}, **kwargs)
203
204    def get_base_group_properties(self, **kwargs):
205        return self.update_properties_with_uniqueness_field(
206            {
207                "parent": self.sync_parent_group,
208            },
209            **kwargs,
210        )
211
212    @property
213    def icon_url(self) -> str:
214        return static("authentik/sources/ldap.png")
215
216    def server(self, **kwargs) -> ServerPool:
217        """Get LDAP Server/ServerPool"""
218        servers = []
219        tls_kwargs = {}
220        if self.peer_certificate:
221            tls_kwargs["ca_certs_data"] = self.peer_certificate.certificate_data
222            tls_kwargs["validate"] = CERT_REQUIRED
223        if self.client_certificate:
224            temp_dir = mkdtemp()
225            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_cert:
226                temp_cert.write(self.client_certificate.certificate_data)
227                certificate_file = temp_cert.name
228                chmod(certificate_file, 0o600)
229            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_key:
230                temp_key.write(self.client_certificate.key_data)
231                private_key_file = temp_key.name
232                chmod(private_key_file, 0o600)
233            tls_kwargs["local_private_key_file"] = private_key_file
234            tls_kwargs["local_certificate_file"] = certificate_file
235        if ciphers := CONFIG.get("ldap.tls.ciphers", None):
236            tls_kwargs["ciphers"] = ciphers.strip()
237        if self.sni:
238            tls_kwargs["sni"] = self.server_uri.split(",", maxsplit=1)[0].strip()
239        server_kwargs = {
240            "get_info": ALL,
241            "connect_timeout": LDAP_TIMEOUT,
242            "tls": Tls(**tls_kwargs),
243        }
244        server_kwargs.update(kwargs)
245        if "," in self.server_uri:
246            for server in self.server_uri.split(","):
247                servers.append(Server(server, **server_kwargs))
248        else:
249            servers = [Server(self.server_uri, **server_kwargs)]
250        return ServerPool(servers, RANDOM, active=5, exhaust=True)
251
252    def connection(
253        self,
254        server: Server | None = None,
255        server_kwargs: dict | None = None,
256        connection_kwargs: dict | None = None,
257    ) -> Connection:
258        """Get a fully connected and bound LDAP Connection"""
259        server_kwargs = server_kwargs or {}
260        connection_kwargs = connection_kwargs or {}
261        if self.bind_cn is not None:
262            connection_kwargs.setdefault("user", self.bind_cn)
263        if self.bind_password is not None:
264            connection_kwargs.setdefault("password", self.bind_password)
265        conn = Connection(
266            server or self.server(**server_kwargs),
267            raise_exceptions=True,
268            receive_timeout=LDAP_TIMEOUT,
269            **connection_kwargs,
270        )
271
272        if self.start_tls:
273            LOGGER.debug("Connection StartTLS", source=self)
274            conn.start_tls(read_server_info=False)
275        try:
276            successful = conn.bind()
277            if successful:
278                return conn
279        except (LDAPSchemaError, LDAPInsufficientAccessRightsResult) as exc:
280            # Schema error, so try connecting without schema info
281            # See https://github.com/goauthentik/authentik/issues/4590
282            # See also https://github.com/goauthentik/authentik/issues/3399
283            if server_kwargs.get("get_info", ALL) == NONE:
284                LOGGER.warning("Failed to connect after schema downgrade", source=self, exc=exc)
285                raise exc
286            LOGGER.warning("Downgrading connection to no schema info", source=self, exc=exc)
287            server_kwargs["get_info"] = NONE
288            return self.connection(server, server_kwargs, connection_kwargs)
289        finally:
290            if conn.server.tls.certificate_file is not None and exists(
291                conn.server.tls.certificate_file
292            ):
293                rmtree(dirname(conn.server.tls.certificate_file))
294        return RuntimeError("Failed to bind")
295
296    @property
297    def sync_lock(self) -> pglock.advisory:
298        """Postgres lock for syncing LDAP to prevent multiple parallel syncs happening"""
299        return pglock.advisory(
300            lock_id=f"goauthentik.io/{connection.schema_name}/sources/ldap/sync/{self.slug}",
301            timeout=0,
302            side_effect=pglock.Return,
303        )
304
305    def get_ldap_server_info(self, srv: Server) -> dict[str, str]:
306        info = {
307            "vendor": _("N/A"),
308            "version": _("N/A"),
309        }
310        if srv.info:
311            info["vendor"] = str(flatten(srv.info.vendor_name))
312            info["version"] = str(flatten(srv.info.vendor_version))
313        return info
314
315    def check_connection(self) -> dict[str, dict[str, str]]:
316        """Check LDAP Connection"""
317        servers = self.server()
318        server_info = {}
319        # Check each individual server
320        for server in servers.servers:
321            server: Server
322            try:
323                conn = self.connection(server=server)
324                server_info[server.host] = {
325                    "status": "ok",
326                    **self.get_ldap_server_info(conn.server),
327                }
328            except LDAPException as exc:
329                server_info[server.host] = {
330                    "status": str(exc),
331                }
332        # Check server pool
333        try:
334            conn = self.connection()
335            server_info["__all__"] = {
336                "status": "ok",
337                **self.get_ldap_server_info(conn.server),
338            }
339        except LDAPException as exc:
340            server_info["__all__"] = {
341                "status": str(exc),
342            }
343        return server_info
344
345    class Meta:
346        verbose_name = _("LDAP Source")
347        verbose_name_plural = _("LDAP Sources")
348
349
350class LDAPSourcePropertyMapping(PropertyMapping):
351    """Map LDAP Property to User or Group object attribute"""
352
353    @property
354    def component(self) -> str:
355        return "ak-property-mapping-source-ldap-form"
356
357    @property
358    def serializer(self) -> type[Serializer]:
359        from authentik.sources.ldap.api import LDAPSourcePropertyMappingSerializer
360
361        return LDAPSourcePropertyMappingSerializer
362
363    def __str__(self):
364        return str(self.name)
365
366    class Meta:
367        verbose_name = _("LDAP Source Property Mapping")
368        verbose_name_plural = _("LDAP Source Property Mappings")
369
370
371class UserLDAPSourceConnection(UserSourceConnection):
372    validated_by = models.UUIDField(
373        null=True,
374        blank=True,
375        help_text=_("Unique ID used while checking if this object still exists in the directory."),
376    )
377
378    @property
379    def serializer(self) -> type[Serializer]:
380        from authentik.sources.ldap.api import (
381            UserLDAPSourceConnectionSerializer,
382        )
383
384        return UserLDAPSourceConnectionSerializer
385
386    class Meta:
387        verbose_name = _("User LDAP Source Connection")
388        verbose_name_plural = _("User LDAP Source Connections")
389        indexes = [
390            models.Index(fields=["validated_by"]),
391        ]
392
393
394class GroupLDAPSourceConnection(GroupSourceConnection):
395    validated_by = models.UUIDField(
396        null=True,
397        blank=True,
398        help_text=_("Unique ID used while checking if this object still exists in the directory."),
399    )
400
401    @property
402    def serializer(self) -> type[Serializer]:
403        from authentik.sources.ldap.api import (
404            GroupLDAPSourceConnectionSerializer,
405        )
406
407        return GroupLDAPSourceConnectionSerializer
408
409    class Meta:
410        verbose_name = _("Group LDAP Source Connection")
411        verbose_name_plural = _("Group LDAP Source Connections")
412        indexes = [
413            models.Index(fields=["validated_by"]),
414        ]
LDAP_TIMEOUT = 15
LDAP_UNIQUENESS = 'ldap_uniq'
LDAP_DISTINGUISHED_NAME = 'distinguishedName'
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def flatten(value: Any) -> Any:
39def flatten(value: Any) -> Any:
40    """Flatten `value` if its a list, set or tuple"""
41    if isinstance(value, list | set | tuple):
42        if len(value) < 1:
43            return None
44        if isinstance(value, set):
45            return value.pop()
46        return value[0]
47    return value

Flatten value if its a list, set or tuple

class MultiURLValidator(authentik.lib.models.DomainlessURLValidator):
50class MultiURLValidator(DomainlessURLValidator):
51    """Same as DomainlessURLValidator but supports multiple URLs separated with a comma."""
52
53    def __call__(self, value: str):
54        if "," in value:
55            for url in value.split(","):
56                super().__call__(url)
57        else:
58            super().__call__(value)

Same as DomainlessURLValidator but supports multiple URLs separated with a comma.

 61class LDAPSource(IncomingSyncSource):
 62    """Federate LDAP Directory with authentik, or create new accounts in LDAP."""
 63
 64    server_uri = models.TextField(
 65        validators=[MultiURLValidator(schemes=["ldap", "ldaps"])],
 66        verbose_name=_("Server URI"),
 67    )
 68    peer_certificate = models.ForeignKey(
 69        CertificateKeyPair,
 70        on_delete=models.SET_DEFAULT,
 71        default=None,
 72        null=True,
 73        related_name="ldap_peer_certificates",
 74        help_text=_(
 75            "Optionally verify the LDAP Server's Certificate against the CA Chain in this keypair."
 76        ),
 77    )
 78    client_certificate = models.ForeignKey(
 79        CertificateKeyPair,
 80        on_delete=models.SET_DEFAULT,
 81        default=None,
 82        null=True,
 83        related_name="ldap_client_certificates",
 84        help_text=_("Client certificate to authenticate against the LDAP Server's Certificate."),
 85    )
 86
 87    bind_cn = models.TextField(verbose_name=_("Bind CN"), blank=True)
 88    bind_password = models.TextField(blank=True)
 89    start_tls = models.BooleanField(default=False, verbose_name=_("Enable Start TLS"))
 90    sni = models.BooleanField(default=False, verbose_name=_("Use Server URI for SNI verification"))
 91
 92    base_dn = models.TextField(verbose_name=_("Base DN"))
 93    additional_user_dn = models.TextField(
 94        help_text=_("Prepended to Base DN for User-queries."),
 95        verbose_name=_("Addition User DN"),
 96        blank=True,
 97    )
 98    additional_group_dn = models.TextField(
 99        help_text=_("Prepended to Base DN for Group-queries."),
100        verbose_name=_("Addition Group DN"),
101        blank=True,
102    )
103
104    user_object_filter = models.TextField(
105        default="(objectClass=person)",
106        help_text=_("Consider Objects matching this filter to be Users."),
107    )
108    user_membership_attribute = models.TextField(
109        default=LDAP_DISTINGUISHED_NAME,
110        help_text=_("Attribute which matches the value of `group_membership_field`."),
111    )
112    group_membership_field = models.TextField(
113        default="member", help_text=_("Field which contains members of a group.")
114    )
115    group_object_filter = models.TextField(
116        default="(objectClass=group)",
117        help_text=_("Consider Objects matching this filter to be Groups."),
118    )
119    object_uniqueness_field = models.TextField(
120        default="objectSid", help_text=_("Field which contains a unique Identifier.")
121    )
122
123    password_login_update_internal_password = models.BooleanField(
124        default=False,
125        help_text=_("Update internal authentik password when login succeeds with LDAP"),
126    )
127
128    sync_users = models.BooleanField(default=True)
129    sync_users_password = models.BooleanField(
130        default=True,
131        help_text=_(
132            "When a user changes their password, sync it back to LDAP. "
133            "This can only be enabled on a single LDAP source."
134        ),
135    )
136    sync_groups = models.BooleanField(default=True)
137    sync_parent_group = models.ForeignKey(
138        Group, blank=True, null=True, default=None, on_delete=models.SET_DEFAULT
139    )
140
141    lookup_groups_from_user = models.BooleanField(
142        default=False,
143        help_text=_(
144            "Lookup group membership based on a user attribute instead of a group attribute. "
145            "This allows nested group resolution on systems like FreeIPA and Active Directory"
146        ),
147    )
148
149    delete_not_found_objects = models.BooleanField(
150        default=False,
151        help_text=_(
152            "Delete authentik users and groups which were previously supplied by this source, "
153            "but are now missing from it."
154        ),
155    )
156
157    @property
158    def component(self) -> str:
159        return "ak-source-ldap-form"
160
161    @property
162    def serializer(self) -> type[Serializer]:
163        from authentik.sources.ldap.api import LDAPSourceSerializer
164
165        return LDAPSourceSerializer
166
167    @property
168    def schedule_specs(self) -> list[ScheduleSpec]:
169        from authentik.sources.ldap.tasks import ldap_connectivity_check, ldap_sync
170
171        return [
172            ScheduleSpec(
173                actor=ldap_sync,
174                uid=self.slug,
175                args=(self.pk,),
176                crontab=f"{fqdn_rand('ldap_sync/' + str(self.pk))} */2 * * *",
177                send_on_save=True,
178            ),
179            ScheduleSpec(
180                actor=ldap_connectivity_check,
181                uid=self.slug,
182                args=(self.pk,),
183                crontab=f"{fqdn_rand('ldap_connectivity_check/' + str(self.pk))} * * * *",
184                send_on_save=True,
185            ),
186        ]
187
188    @property
189    def property_mapping_type(self) -> type[PropertyMapping]:
190        from authentik.sources.ldap.models import LDAPSourcePropertyMapping
191
192        return LDAPSourcePropertyMapping
193
194    def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs):
195        properties.setdefault("attributes", {})[LDAP_DISTINGUISHED_NAME] = dn
196        if self.object_uniqueness_field in ldap:
197            properties["attributes"][LDAP_UNIQUENESS] = flatten(
198                ldap.get(self.object_uniqueness_field)
199            )
200        return properties
201
202    def get_base_user_properties(self, **kwargs):
203        return self.update_properties_with_uniqueness_field({}, **kwargs)
204
205    def get_base_group_properties(self, **kwargs):
206        return self.update_properties_with_uniqueness_field(
207            {
208                "parent": self.sync_parent_group,
209            },
210            **kwargs,
211        )
212
213    @property
214    def icon_url(self) -> str:
215        return static("authentik/sources/ldap.png")
216
217    def server(self, **kwargs) -> ServerPool:
218        """Get LDAP Server/ServerPool"""
219        servers = []
220        tls_kwargs = {}
221        if self.peer_certificate:
222            tls_kwargs["ca_certs_data"] = self.peer_certificate.certificate_data
223            tls_kwargs["validate"] = CERT_REQUIRED
224        if self.client_certificate:
225            temp_dir = mkdtemp()
226            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_cert:
227                temp_cert.write(self.client_certificate.certificate_data)
228                certificate_file = temp_cert.name
229                chmod(certificate_file, 0o600)
230            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_key:
231                temp_key.write(self.client_certificate.key_data)
232                private_key_file = temp_key.name
233                chmod(private_key_file, 0o600)
234            tls_kwargs["local_private_key_file"] = private_key_file
235            tls_kwargs["local_certificate_file"] = certificate_file
236        if ciphers := CONFIG.get("ldap.tls.ciphers", None):
237            tls_kwargs["ciphers"] = ciphers.strip()
238        if self.sni:
239            tls_kwargs["sni"] = self.server_uri.split(",", maxsplit=1)[0].strip()
240        server_kwargs = {
241            "get_info": ALL,
242            "connect_timeout": LDAP_TIMEOUT,
243            "tls": Tls(**tls_kwargs),
244        }
245        server_kwargs.update(kwargs)
246        if "," in self.server_uri:
247            for server in self.server_uri.split(","):
248                servers.append(Server(server, **server_kwargs))
249        else:
250            servers = [Server(self.server_uri, **server_kwargs)]
251        return ServerPool(servers, RANDOM, active=5, exhaust=True)
252
253    def connection(
254        self,
255        server: Server | None = None,
256        server_kwargs: dict | None = None,
257        connection_kwargs: dict | None = None,
258    ) -> Connection:
259        """Get a fully connected and bound LDAP Connection"""
260        server_kwargs = server_kwargs or {}
261        connection_kwargs = connection_kwargs or {}
262        if self.bind_cn is not None:
263            connection_kwargs.setdefault("user", self.bind_cn)
264        if self.bind_password is not None:
265            connection_kwargs.setdefault("password", self.bind_password)
266        conn = Connection(
267            server or self.server(**server_kwargs),
268            raise_exceptions=True,
269            receive_timeout=LDAP_TIMEOUT,
270            **connection_kwargs,
271        )
272
273        if self.start_tls:
274            LOGGER.debug("Connection StartTLS", source=self)
275            conn.start_tls(read_server_info=False)
276        try:
277            successful = conn.bind()
278            if successful:
279                return conn
280        except (LDAPSchemaError, LDAPInsufficientAccessRightsResult) as exc:
281            # Schema error, so try connecting without schema info
282            # See https://github.com/goauthentik/authentik/issues/4590
283            # See also https://github.com/goauthentik/authentik/issues/3399
284            if server_kwargs.get("get_info", ALL) == NONE:
285                LOGGER.warning("Failed to connect after schema downgrade", source=self, exc=exc)
286                raise exc
287            LOGGER.warning("Downgrading connection to no schema info", source=self, exc=exc)
288            server_kwargs["get_info"] = NONE
289            return self.connection(server, server_kwargs, connection_kwargs)
290        finally:
291            if conn.server.tls.certificate_file is not None and exists(
292                conn.server.tls.certificate_file
293            ):
294                rmtree(dirname(conn.server.tls.certificate_file))
295        return RuntimeError("Failed to bind")
296
297    @property
298    def sync_lock(self) -> pglock.advisory:
299        """Postgres lock for syncing LDAP to prevent multiple parallel syncs happening"""
300        return pglock.advisory(
301            lock_id=f"goauthentik.io/{connection.schema_name}/sources/ldap/sync/{self.slug}",
302            timeout=0,
303            side_effect=pglock.Return,
304        )
305
306    def get_ldap_server_info(self, srv: Server) -> dict[str, str]:
307        info = {
308            "vendor": _("N/A"),
309            "version": _("N/A"),
310        }
311        if srv.info:
312            info["vendor"] = str(flatten(srv.info.vendor_name))
313            info["version"] = str(flatten(srv.info.vendor_version))
314        return info
315
316    def check_connection(self) -> dict[str, dict[str, str]]:
317        """Check LDAP Connection"""
318        servers = self.server()
319        server_info = {}
320        # Check each individual server
321        for server in servers.servers:
322            server: Server
323            try:
324                conn = self.connection(server=server)
325                server_info[server.host] = {
326                    "status": "ok",
327                    **self.get_ldap_server_info(conn.server),
328                }
329            except LDAPException as exc:
330                server_info[server.host] = {
331                    "status": str(exc),
332                }
333        # Check server pool
334        try:
335            conn = self.connection()
336            server_info["__all__"] = {
337                "status": "ok",
338                **self.get_ldap_server_info(conn.server),
339            }
340        except LDAPException as exc:
341            server_info["__all__"] = {
342                "status": str(exc),
343            }
344        return server_info
345
346    class Meta:
347        verbose_name = _("LDAP Source")
348        verbose_name_plural = _("LDAP Sources")

Federate LDAP Directory with authentik, or create new accounts in LDAP.

def server_uri(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

peer_certificate

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

client_certificate

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def bind_cn(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def bind_password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def start_tls(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sni(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def base_dn(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def additional_user_dn(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def additional_group_dn(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def user_object_filter(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def user_membership_attribute(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def group_membership_field(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def group_object_filter(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def object_uniqueness_field(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def password_login_update_internal_password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_users(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_users_password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_groups(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

sync_parent_group

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def lookup_groups_from_user(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def delete_not_found_objects(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

component: str
157    @property
158    def component(self) -> str:
159        return "ak-source-ldap-form"

Return component used to edit this object

serializer: type[rest_framework.serializers.Serializer]
161    @property
162    def serializer(self) -> type[Serializer]:
163        from authentik.sources.ldap.api import LDAPSourceSerializer
164
165        return LDAPSourceSerializer

Get serializer for this model

schedule_specs: list[authentik.tasks.schedules.common.ScheduleSpec]
167    @property
168    def schedule_specs(self) -> list[ScheduleSpec]:
169        from authentik.sources.ldap.tasks import ldap_connectivity_check, ldap_sync
170
171        return [
172            ScheduleSpec(
173                actor=ldap_sync,
174                uid=self.slug,
175                args=(self.pk,),
176                crontab=f"{fqdn_rand('ldap_sync/' + str(self.pk))} */2 * * *",
177                send_on_save=True,
178            ),
179            ScheduleSpec(
180                actor=ldap_connectivity_check,
181                uid=self.slug,
182                args=(self.pk,),
183                crontab=f"{fqdn_rand('ldap_connectivity_check/' + str(self.pk))} * * * *",
184                send_on_save=True,
185            ),
186        ]
property_mapping_type: type[authentik.core.models.PropertyMapping]
188    @property
189    def property_mapping_type(self) -> type[PropertyMapping]:
190        from authentik.sources.ldap.models import LDAPSourcePropertyMapping
191
192        return LDAPSourcePropertyMapping

Return property mapping type used by this object

def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs):
194    def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs):
195        properties.setdefault("attributes", {})[LDAP_DISTINGUISHED_NAME] = dn
196        if self.object_uniqueness_field in ldap:
197            properties["attributes"][LDAP_UNIQUENESS] = flatten(
198                ldap.get(self.object_uniqueness_field)
199            )
200        return properties
def get_base_user_properties(self, **kwargs):
202    def get_base_user_properties(self, **kwargs):
203        return self.update_properties_with_uniqueness_field({}, **kwargs)

Get base properties for a user to build final properties upon.

def get_base_group_properties(self, **kwargs):
205    def get_base_group_properties(self, **kwargs):
206        return self.update_properties_with_uniqueness_field(
207            {
208                "parent": self.sync_parent_group,
209            },
210            **kwargs,
211        )

Get base properties for a group to build final properties upon.

icon_url: str
213    @property
214    def icon_url(self) -> str:
215        return static("authentik/sources/ldap.png")

Get the URL to the source icon

def server(self, **kwargs) -> ldap3.core.pooling.ServerPool:
217    def server(self, **kwargs) -> ServerPool:
218        """Get LDAP Server/ServerPool"""
219        servers = []
220        tls_kwargs = {}
221        if self.peer_certificate:
222            tls_kwargs["ca_certs_data"] = self.peer_certificate.certificate_data
223            tls_kwargs["validate"] = CERT_REQUIRED
224        if self.client_certificate:
225            temp_dir = mkdtemp()
226            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_cert:
227                temp_cert.write(self.client_certificate.certificate_data)
228                certificate_file = temp_cert.name
229                chmod(certificate_file, 0o600)
230            with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_key:
231                temp_key.write(self.client_certificate.key_data)
232                private_key_file = temp_key.name
233                chmod(private_key_file, 0o600)
234            tls_kwargs["local_private_key_file"] = private_key_file
235            tls_kwargs["local_certificate_file"] = certificate_file
236        if ciphers := CONFIG.get("ldap.tls.ciphers", None):
237            tls_kwargs["ciphers"] = ciphers.strip()
238        if self.sni:
239            tls_kwargs["sni"] = self.server_uri.split(",", maxsplit=1)[0].strip()
240        server_kwargs = {
241            "get_info": ALL,
242            "connect_timeout": LDAP_TIMEOUT,
243            "tls": Tls(**tls_kwargs),
244        }
245        server_kwargs.update(kwargs)
246        if "," in self.server_uri:
247            for server in self.server_uri.split(","):
248                servers.append(Server(server, **server_kwargs))
249        else:
250            servers = [Server(self.server_uri, **server_kwargs)]
251        return ServerPool(servers, RANDOM, active=5, exhaust=True)

Get LDAP Server/ServerPool

def connection( self, server: ldap3.core.server.Server | None = None, server_kwargs: dict | None = None, connection_kwargs: dict | None = None) -> ldap3.core.connection.Connection:
253    def connection(
254        self,
255        server: Server | None = None,
256        server_kwargs: dict | None = None,
257        connection_kwargs: dict | None = None,
258    ) -> Connection:
259        """Get a fully connected and bound LDAP Connection"""
260        server_kwargs = server_kwargs or {}
261        connection_kwargs = connection_kwargs or {}
262        if self.bind_cn is not None:
263            connection_kwargs.setdefault("user", self.bind_cn)
264        if self.bind_password is not None:
265            connection_kwargs.setdefault("password", self.bind_password)
266        conn = Connection(
267            server or self.server(**server_kwargs),
268            raise_exceptions=True,
269            receive_timeout=LDAP_TIMEOUT,
270            **connection_kwargs,
271        )
272
273        if self.start_tls:
274            LOGGER.debug("Connection StartTLS", source=self)
275            conn.start_tls(read_server_info=False)
276        try:
277            successful = conn.bind()
278            if successful:
279                return conn
280        except (LDAPSchemaError, LDAPInsufficientAccessRightsResult) as exc:
281            # Schema error, so try connecting without schema info
282            # See https://github.com/goauthentik/authentik/issues/4590
283            # See also https://github.com/goauthentik/authentik/issues/3399
284            if server_kwargs.get("get_info", ALL) == NONE:
285                LOGGER.warning("Failed to connect after schema downgrade", source=self, exc=exc)
286                raise exc
287            LOGGER.warning("Downgrading connection to no schema info", source=self, exc=exc)
288            server_kwargs["get_info"] = NONE
289            return self.connection(server, server_kwargs, connection_kwargs)
290        finally:
291            if conn.server.tls.certificate_file is not None and exists(
292                conn.server.tls.certificate_file
293            ):
294                rmtree(dirname(conn.server.tls.certificate_file))
295        return RuntimeError("Failed to bind")

Get a fully connected and bound LDAP Connection

sync_lock: pglock.core.advisory
297    @property
298    def sync_lock(self) -> pglock.advisory:
299        """Postgres lock for syncing LDAP to prevent multiple parallel syncs happening"""
300        return pglock.advisory(
301            lock_id=f"goauthentik.io/{connection.schema_name}/sources/ldap/sync/{self.slug}",
302            timeout=0,
303            side_effect=pglock.Return,
304        )

Postgres lock for syncing LDAP to prevent multiple parallel syncs happening

def get_ldap_server_info(self, srv: ldap3.core.server.Server) -> dict[str, str]:
306    def get_ldap_server_info(self, srv: Server) -> dict[str, str]:
307        info = {
308            "vendor": _("N/A"),
309            "version": _("N/A"),
310        }
311        if srv.info:
312            info["vendor"] = str(flatten(srv.info.vendor_name))
313            info["version"] = str(flatten(srv.info.vendor_version))
314        return info
def check_connection(self) -> dict[str, dict[str, str]]:
316    def check_connection(self) -> dict[str, dict[str, str]]:
317        """Check LDAP Connection"""
318        servers = self.server()
319        server_info = {}
320        # Check each individual server
321        for server in servers.servers:
322            server: Server
323            try:
324                conn = self.connection(server=server)
325                server_info[server.host] = {
326                    "status": "ok",
327                    **self.get_ldap_server_info(conn.server),
328                }
329            except LDAPException as exc:
330                server_info[server.host] = {
331                    "status": str(exc),
332                }
333        # Check server pool
334        try:
335            conn = self.connection()
336            server_info["__all__"] = {
337                "status": "ok",
338                **self.get_ldap_server_info(conn.server),
339            }
340        except LDAPException as exc:
341            server_info["__all__"] = {
342                "status": str(exc),
343            }
344        return server_info

Check LDAP Connection

def sync_outgoing_trigger_mode(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

schedules

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

tasks

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

peer_certificate_id
client_certificate_id
sync_parent_group_id
source_ptr_id
source_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

def get_sync_outgoing_trigger_mode_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

class LDAPSource.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class LDAPSource.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class LDAPSourcePropertyMapping(authentik.core.models.PropertyMapping):
351class LDAPSourcePropertyMapping(PropertyMapping):
352    """Map LDAP Property to User or Group object attribute"""
353
354    @property
355    def component(self) -> str:
356        return "ak-property-mapping-source-ldap-form"
357
358    @property
359    def serializer(self) -> type[Serializer]:
360        from authentik.sources.ldap.api import LDAPSourcePropertyMappingSerializer
361
362        return LDAPSourcePropertyMappingSerializer
363
364    def __str__(self):
365        return str(self.name)
366
367    class Meta:
368        verbose_name = _("LDAP Source Property Mapping")
369        verbose_name_plural = _("LDAP Source Property Mappings")

Map LDAP Property to User or Group object attribute

component: str
354    @property
355    def component(self) -> str:
356        return "ak-property-mapping-source-ldap-form"

Return component used to edit this object

serializer: type[rest_framework.serializers.Serializer]
358    @property
359    def serializer(self) -> type[Serializer]:
360        from authentik.sources.ldap.api import LDAPSourcePropertyMappingSerializer
361
362        return LDAPSourcePropertyMappingSerializer

Get serializer for this model

propertymapping_ptr_id
propertymapping_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class LDAPSourcePropertyMapping.DoesNotExist(authentik.core.models.PropertyMapping.DoesNotExist):

The requested object does not exist

class LDAPSourcePropertyMapping.MultipleObjectsReturned(authentik.core.models.PropertyMapping.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class UserLDAPSourceConnection(authentik.core.models.UserSourceConnection):
372class UserLDAPSourceConnection(UserSourceConnection):
373    validated_by = models.UUIDField(
374        null=True,
375        blank=True,
376        help_text=_("Unique ID used while checking if this object still exists in the directory."),
377    )
378
379    @property
380    def serializer(self) -> type[Serializer]:
381        from authentik.sources.ldap.api import (
382            UserLDAPSourceConnectionSerializer,
383        )
384
385        return UserLDAPSourceConnectionSerializer
386
387    class Meta:
388        verbose_name = _("User LDAP Source Connection")
389        verbose_name_plural = _("User LDAP Source Connections")
390        indexes = [
391            models.Index(fields=["validated_by"]),
392        ]

UserLDAPSourceConnection(id, created, last_updated, user, source, identifier, usersourceconnection_ptr, validated_by)

def validated_by(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.Serializer]
379    @property
380    def serializer(self) -> type[Serializer]:
381        from authentik.sources.ldap.api import (
382            UserLDAPSourceConnectionSerializer,
383        )
384
385        return UserLDAPSourceConnectionSerializer

Get serializer for this model

usersourceconnection_ptr_id
usersourceconnection_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class UserLDAPSourceConnection.DoesNotExist(authentik.core.models.UserSourceConnection.DoesNotExist):

The requested object does not exist

class UserLDAPSourceConnection.MultipleObjectsReturned(authentik.core.models.UserSourceConnection.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class GroupLDAPSourceConnection(authentik.core.models.GroupSourceConnection):
395class GroupLDAPSourceConnection(GroupSourceConnection):
396    validated_by = models.UUIDField(
397        null=True,
398        blank=True,
399        help_text=_("Unique ID used while checking if this object still exists in the directory."),
400    )
401
402    @property
403    def serializer(self) -> type[Serializer]:
404        from authentik.sources.ldap.api import (
405            GroupLDAPSourceConnectionSerializer,
406        )
407
408        return GroupLDAPSourceConnectionSerializer
409
410    class Meta:
411        verbose_name = _("Group LDAP Source Connection")
412        verbose_name_plural = _("Group LDAP Source Connections")
413        indexes = [
414            models.Index(fields=["validated_by"]),
415        ]

GroupLDAPSourceConnection(id, created, last_updated, group, source, identifier, groupsourceconnection_ptr, validated_by)

def validated_by(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.Serializer]
402    @property
403    def serializer(self) -> type[Serializer]:
404        from authentik.sources.ldap.api import (
405            GroupLDAPSourceConnectionSerializer,
406        )
407
408        return GroupLDAPSourceConnectionSerializer

Get serializer for this model

groupsourceconnection_ptr_id
groupsourceconnection_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class GroupLDAPSourceConnection.DoesNotExist(authentik.core.models.GroupSourceConnection.DoesNotExist):

The requested object does not exist

class GroupLDAPSourceConnection.MultipleObjectsReturned(authentik.core.models.GroupSourceConnection.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.