authentik.sources.ldap.models
authentik LDAP Models
1"""authentik LDAP Models""" 2 3from os import chmod 4from os.path import dirname, exists 5from shutil import rmtree 6from ssl import CERT_REQUIRED 7from tempfile import NamedTemporaryFile, mkdtemp 8from typing import Any 9 10import pglock 11from django.db import connection, models 12from django.templatetags.static import static 13from django.utils.translation import gettext_lazy as _ 14from ldap3 import ALL, NONE, RANDOM, Connection, Server, ServerPool, Tls 15from ldap3.core.exceptions import ( 16 LDAPAdminLimitExceededResult, 17 LDAPAttributeError, 18 LDAPException, 19 LDAPInsufficientAccessRightsResult, 20 LDAPSchemaError, 21) 22from rest_framework.serializers import Serializer 23from structlog.stdlib import get_logger 24 25from authentik.core.models import ( 26 Group, 27 GroupSourceConnection, 28 PropertyMapping, 29 UserSourceConnection, 30) 31from authentik.crypto.models import CertificateKeyPair 32from authentik.lib.config import CONFIG 33from authentik.lib.models import DomainlessURLValidator 34from authentik.lib.sync.incoming.models import IncomingSyncSource 35from authentik.lib.utils.time import fqdn_rand 36from authentik.tasks.schedules.common import ScheduleSpec 37 38LDAP_TIMEOUT = 15 39LDAP_UNIQUENESS = "ldap_uniq" 40"""Deprecated, don't use""" 41LDAP_DISTINGUISHED_NAME = "distinguishedName" 42LOGGER = get_logger() 43 44 45def flatten(value: Any) -> Any: 46 """Flatten `value` if its a list, set or tuple""" 47 if isinstance(value, list | set | tuple): 48 if len(value) < 1: 49 return None 50 if isinstance(value, set): 51 return value.pop() 52 return value[0] 53 return value 54 55 56class MultiURLValidator(DomainlessURLValidator): 57 """Same as DomainlessURLValidator but supports multiple URLs separated with a comma.""" 58 59 def __call__(self, value: str): 60 if "," in value: 61 for url in value.split(","): 62 super().__call__(url) 63 else: 64 super().__call__(value) 65 66 67class LDAPSource(IncomingSyncSource): 68 """Federate LDAP Directory with authentik, or create new accounts in LDAP.""" 69 70 server_uri = models.TextField( 71 validators=[MultiURLValidator(schemes=["ldap", "ldaps"])], 72 verbose_name=_("Server URI"), 73 ) 74 peer_certificate = models.ForeignKey( 75 CertificateKeyPair, 76 on_delete=models.SET_DEFAULT, 77 default=None, 78 null=True, 79 related_name="ldap_peer_certificates", 80 help_text=_( 81 "Optionally verify the LDAP Server's Certificate against the CA Chain in this keypair." 82 ), 83 ) 84 client_certificate = models.ForeignKey( 85 CertificateKeyPair, 86 on_delete=models.SET_DEFAULT, 87 default=None, 88 null=True, 89 related_name="ldap_client_certificates", 90 help_text=_("Client certificate to authenticate against the LDAP Server's Certificate."), 91 ) 92 93 bind_cn = models.TextField(verbose_name=_("Bind CN"), blank=True) 94 bind_password = models.TextField(blank=True) 95 start_tls = models.BooleanField(default=False, verbose_name=_("Enable Start TLS")) 96 sni = models.BooleanField(default=False, verbose_name=_("Use Server URI for SNI verification")) 97 98 base_dn = models.TextField(verbose_name=_("Base DN")) 99 additional_user_dn = models.TextField( 100 help_text=_("Prepended to Base DN for User-queries."), 101 verbose_name=_("Addition User DN"), 102 blank=True, 103 ) 104 additional_group_dn = models.TextField( 105 help_text=_("Prepended to Base DN for Group-queries."), 106 verbose_name=_("Addition Group DN"), 107 blank=True, 108 ) 109 110 user_object_filter = models.TextField( 111 default="(objectClass=person)", 112 help_text=_("Consider Objects matching this filter to be Users."), 113 ) 114 user_membership_attribute = models.TextField( 115 default=LDAP_DISTINGUISHED_NAME, 116 help_text=_("Attribute which matches the value of `group_membership_field`."), 117 ) 118 group_membership_field = models.TextField( 119 default="member", help_text=_("Field which contains members of a group.") 120 ) 121 group_object_filter = models.TextField( 122 default="(objectClass=group)", 123 help_text=_("Consider Objects matching this filter to be Groups."), 124 ) 125 object_uniqueness_field = models.TextField( 126 default="objectSid", help_text=_("Field which contains a unique Identifier.") 127 ) 128 129 password_login_update_internal_password = models.BooleanField( 130 default=False, 131 help_text=_("Update internal authentik password when login succeeds with LDAP"), 132 ) 133 134 sync_users = models.BooleanField(default=True) 135 sync_users_password = models.BooleanField( 136 default=True, 137 help_text=_( 138 "When a user changes their password, sync it back to LDAP. " 139 "This can only be enabled on a single LDAP source." 140 ), 141 ) 142 sync_groups = models.BooleanField(default=True) 143 sync_parent_group = models.ForeignKey( 144 Group, blank=True, null=True, default=None, on_delete=models.SET_DEFAULT 145 ) 146 147 lookup_groups_from_user = models.BooleanField( 148 default=False, 149 help_text=_( 150 "Lookup group membership based on a user attribute instead of a group attribute. " 151 "This allows nested group resolution on systems like FreeIPA and Active Directory" 152 ), 153 ) 154 155 delete_not_found_objects = models.BooleanField( 156 default=False, 157 help_text=_( 158 "Delete authentik users and groups which were previously supplied by this source, " 159 "but are now missing from it." 160 ), 161 ) 162 163 @property 164 def component(self) -> str: 165 return "ak-source-ldap-form" 166 167 @property 168 def serializer(self) -> type[Serializer]: 169 from authentik.sources.ldap.api.sources import LDAPSourceSerializer 170 171 return LDAPSourceSerializer 172 173 @property 174 def schedule_specs(self) -> list[ScheduleSpec]: 175 from authentik.sources.ldap.tasks import ldap_connectivity_check, ldap_sync 176 177 return [ 178 ScheduleSpec( 179 actor=ldap_sync, 180 uid=self.slug, 181 args=(self.pk,), 182 crontab=f"{fqdn_rand('ldap_sync/' + str(self.pk))} */2 * * *", 183 send_on_save=True, 184 ), 185 ScheduleSpec( 186 actor=ldap_connectivity_check, 187 uid=self.slug, 188 args=(self.pk,), 189 crontab=f"{fqdn_rand('ldap_connectivity_check/' + str(self.pk))} * * * *", 190 send_on_save=True, 191 ), 192 ] 193 194 @property 195 def property_mapping_type(self) -> type[PropertyMapping]: 196 from authentik.sources.ldap.models import LDAPSourcePropertyMapping 197 198 return LDAPSourcePropertyMapping 199 200 def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs): 201 properties.setdefault("attributes", {})[LDAP_DISTINGUISHED_NAME] = dn 202 # TODO: Remove after 2026.5, still stored for legacy 203 if self.object_uniqueness_field in ldap: 204 properties["attributes"][LDAP_UNIQUENESS] = flatten( 205 ldap.get(self.object_uniqueness_field) 206 ) 207 return properties 208 209 def get_base_user_properties(self, **kwargs): 210 return self.update_properties_with_uniqueness_field({}, **kwargs) 211 212 def get_base_group_properties(self, **kwargs): 213 return self.update_properties_with_uniqueness_field( 214 { 215 "parent": self.sync_parent_group, 216 }, 217 **kwargs, 218 ) 219 220 @property 221 def icon_url(self) -> str: 222 return static("authentik/sources/ldap.png") 223 224 def server(self, **kwargs) -> ServerPool: 225 """Get LDAP Server/ServerPool""" 226 servers = [] 227 tls_kwargs = {} 228 if self.peer_certificate: 229 tls_kwargs["ca_certs_data"] = self.peer_certificate.certificate_data 230 tls_kwargs["validate"] = CERT_REQUIRED 231 if self.client_certificate: 232 temp_dir = mkdtemp() 233 with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_cert: 234 temp_cert.write(self.client_certificate.certificate_data) 235 certificate_file = temp_cert.name 236 chmod(certificate_file, 0o600) 237 with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_key: 238 temp_key.write(self.client_certificate.key_data) 239 private_key_file = temp_key.name 240 chmod(private_key_file, 0o600) 241 tls_kwargs["local_private_key_file"] = private_key_file 242 tls_kwargs["local_certificate_file"] = certificate_file 243 if ciphers := CONFIG.get("ldap.tls.ciphers", None): 244 tls_kwargs["ciphers"] = ciphers.strip() 245 if self.sni: 246 tls_kwargs["sni"] = self.server_uri.split(",", maxsplit=1)[0].strip() 247 server_kwargs = { 248 "get_info": ALL, 249 "connect_timeout": LDAP_TIMEOUT, 250 "tls": Tls(**tls_kwargs), 251 } 252 server_kwargs.update(kwargs) 253 if "," in self.server_uri: 254 for server in self.server_uri.split(","): 255 servers.append(Server(server, **server_kwargs)) 256 else: 257 servers = [Server(self.server_uri, **server_kwargs)] 258 return ServerPool(servers, RANDOM, active=5, exhaust=True) 259 260 def connection( 261 self, 262 server: Server | None = None, 263 server_kwargs: dict | None = None, 264 connection_kwargs: dict | None = None, 265 ) -> Connection: 266 """Get a fully connected and bound LDAP Connection""" 267 server_kwargs = server_kwargs or {} 268 connection_kwargs = connection_kwargs or {} 269 if self.bind_cn is not None: 270 connection_kwargs.setdefault("user", self.bind_cn) 271 if self.bind_password is not None: 272 connection_kwargs.setdefault("password", self.bind_password) 273 conn = Connection( 274 server or self.server(**server_kwargs), 275 raise_exceptions=True, 276 receive_timeout=LDAP_TIMEOUT, 277 **connection_kwargs, 278 ) 279 280 if self.start_tls: 281 LOGGER.debug("Connection StartTLS", source=self) 282 conn.start_tls(read_server_info=False) 283 try: 284 successful = conn.bind() 285 if successful: 286 return conn 287 except ( 288 LDAPSchemaError, 289 LDAPInsufficientAccessRightsResult, 290 LDAPAdminLimitExceededResult, 291 LDAPAttributeError, 292 ) as exc: 293 # Schema error or rate limit during schema fetch, retry without schema info 294 # See https://github.com/goauthentik/authentik/issues/4590 295 # See also https://github.com/goauthentik/authentik/issues/3399 296 # LDAPAdminLimitExceededResult: Google Secure LDAP rate-limits schema queries 297 # LDAPAttributeError: Google Secure LDAP returns unsupported attrs in schema 298 if server_kwargs.get("get_info", ALL) == NONE: 299 LOGGER.warning("Failed to connect after schema downgrade", source=self, exc=exc) 300 raise exc 301 LOGGER.warning("Downgrading connection to no schema info", source=self, exc=exc) 302 server_kwargs["get_info"] = NONE 303 return self.connection(server, server_kwargs, connection_kwargs) 304 finally: 305 if conn.server.tls.certificate_file is not None and exists( 306 conn.server.tls.certificate_file 307 ): 308 rmtree(dirname(conn.server.tls.certificate_file)) 309 return RuntimeError("Failed to bind") 310 311 @property 312 def sync_lock(self) -> pglock.advisory: 313 """Postgres lock for syncing LDAP to prevent multiple parallel syncs happening""" 314 return pglock.advisory( 315 lock_id=f"goauthentik.io/{connection.schema_name}/sources/ldap/sync/{self.slug}", 316 timeout=0, 317 side_effect=pglock.Return, 318 ) 319 320 def get_ldap_server_info(self, srv: Server) -> dict[str, str]: 321 info = { 322 "vendor": _("N/A"), 323 "version": _("N/A"), 324 } 325 if srv.info: 326 info["vendor"] = str(flatten(srv.info.vendor_name)) 327 info["version"] = str(flatten(srv.info.vendor_version)) 328 return info 329 330 def check_connection(self) -> dict[str, dict[str, str]]: 331 """Check LDAP Connection""" 332 servers = self.server() 333 server_info = {} 334 # Check each individual server 335 for server in servers.servers: 336 server: Server 337 try: 338 conn = self.connection(server=server) 339 server_info[server.host] = { 340 "status": "ok", 341 **self.get_ldap_server_info(conn.server), 342 } 343 except LDAPException as exc: 344 server_info[server.host] = { 345 "status": str(exc), 346 } 347 # Check server pool 348 try: 349 conn = self.connection() 350 server_info["__all__"] = { 351 "status": "ok", 352 **self.get_ldap_server_info(conn.server), 353 } 354 except LDAPException as exc: 355 server_info["__all__"] = { 356 "status": str(exc), 357 } 358 return server_info 359 360 class Meta: 361 verbose_name = _("LDAP Source") 362 verbose_name_plural = _("LDAP Sources") 363 364 365class LDAPSourcePropertyMapping(PropertyMapping): 366 """Map LDAP Property to User or Group object attribute""" 367 368 @property 369 def component(self) -> str: 370 return "ak-property-mapping-source-ldap-form" 371 372 @property 373 def serializer(self) -> type[Serializer]: 374 from authentik.sources.ldap.api.property_mappings import LDAPSourcePropertyMappingSerializer 375 376 return LDAPSourcePropertyMappingSerializer 377 378 def __str__(self): 379 return str(self.name) 380 381 class Meta: 382 verbose_name = _("LDAP Source Property Mapping") 383 verbose_name_plural = _("LDAP Source Property Mappings") 384 385 386class UserLDAPSourceConnection(UserSourceConnection): 387 validated_by = models.UUIDField( 388 null=True, 389 blank=True, 390 help_text=_("Unique ID used while checking if this object still exists in the directory."), 391 ) 392 393 @property 394 def serializer(self) -> type[Serializer]: 395 from authentik.sources.ldap.api.connections import ( 396 UserLDAPSourceConnectionSerializer, 397 ) 398 399 return UserLDAPSourceConnectionSerializer 400 401 class Meta: 402 verbose_name = _("User LDAP Source Connection") 403 verbose_name_plural = _("User LDAP Source Connections") 404 indexes = [ 405 models.Index(fields=["validated_by"]), 406 ] 407 408 409class GroupLDAPSourceConnection(GroupSourceConnection): 410 validated_by = models.UUIDField( 411 null=True, 412 blank=True, 413 help_text=_("Unique ID used while checking if this object still exists in the directory."), 414 ) 415 416 @property 417 def serializer(self) -> type[Serializer]: 418 from authentik.sources.ldap.api.connections import ( 419 GroupLDAPSourceConnectionSerializer, 420 ) 421 422 return GroupLDAPSourceConnectionSerializer 423 424 class Meta: 425 verbose_name = _("Group LDAP Source Connection") 426 verbose_name_plural = _("Group LDAP Source Connections") 427 indexes = [ 428 models.Index(fields=["validated_by"]), 429 ]
Deprecated, don't use
46def flatten(value: Any) -> Any: 47 """Flatten `value` if its a list, set or tuple""" 48 if isinstance(value, list | set | tuple): 49 if len(value) < 1: 50 return None 51 if isinstance(value, set): 52 return value.pop() 53 return value[0] 54 return value
Flatten value if its a list, set or tuple
57class MultiURLValidator(DomainlessURLValidator): 58 """Same as DomainlessURLValidator but supports multiple URLs separated with a comma.""" 59 60 def __call__(self, value: str): 61 if "," in value: 62 for url in value.split(","): 63 super().__call__(url) 64 else: 65 super().__call__(value)
Same as DomainlessURLValidator but supports multiple URLs separated with a comma.
Inherited Members
68class LDAPSource(IncomingSyncSource): 69 """Federate LDAP Directory with authentik, or create new accounts in LDAP.""" 70 71 server_uri = models.TextField( 72 validators=[MultiURLValidator(schemes=["ldap", "ldaps"])], 73 verbose_name=_("Server URI"), 74 ) 75 peer_certificate = models.ForeignKey( 76 CertificateKeyPair, 77 on_delete=models.SET_DEFAULT, 78 default=None, 79 null=True, 80 related_name="ldap_peer_certificates", 81 help_text=_( 82 "Optionally verify the LDAP Server's Certificate against the CA Chain in this keypair." 83 ), 84 ) 85 client_certificate = models.ForeignKey( 86 CertificateKeyPair, 87 on_delete=models.SET_DEFAULT, 88 default=None, 89 null=True, 90 related_name="ldap_client_certificates", 91 help_text=_("Client certificate to authenticate against the LDAP Server's Certificate."), 92 ) 93 94 bind_cn = models.TextField(verbose_name=_("Bind CN"), blank=True) 95 bind_password = models.TextField(blank=True) 96 start_tls = models.BooleanField(default=False, verbose_name=_("Enable Start TLS")) 97 sni = models.BooleanField(default=False, verbose_name=_("Use Server URI for SNI verification")) 98 99 base_dn = models.TextField(verbose_name=_("Base DN")) 100 additional_user_dn = models.TextField( 101 help_text=_("Prepended to Base DN for User-queries."), 102 verbose_name=_("Addition User DN"), 103 blank=True, 104 ) 105 additional_group_dn = models.TextField( 106 help_text=_("Prepended to Base DN for Group-queries."), 107 verbose_name=_("Addition Group DN"), 108 blank=True, 109 ) 110 111 user_object_filter = models.TextField( 112 default="(objectClass=person)", 113 help_text=_("Consider Objects matching this filter to be Users."), 114 ) 115 user_membership_attribute = models.TextField( 116 default=LDAP_DISTINGUISHED_NAME, 117 help_text=_("Attribute which matches the value of `group_membership_field`."), 118 ) 119 group_membership_field = models.TextField( 120 default="member", help_text=_("Field which contains members of a group.") 121 ) 122 group_object_filter = models.TextField( 123 default="(objectClass=group)", 124 help_text=_("Consider Objects matching this filter to be Groups."), 125 ) 126 object_uniqueness_field = models.TextField( 127 default="objectSid", help_text=_("Field which contains a unique Identifier.") 128 ) 129 130 password_login_update_internal_password = models.BooleanField( 131 default=False, 132 help_text=_("Update internal authentik password when login succeeds with LDAP"), 133 ) 134 135 sync_users = models.BooleanField(default=True) 136 sync_users_password = models.BooleanField( 137 default=True, 138 help_text=_( 139 "When a user changes their password, sync it back to LDAP. " 140 "This can only be enabled on a single LDAP source." 141 ), 142 ) 143 sync_groups = models.BooleanField(default=True) 144 sync_parent_group = models.ForeignKey( 145 Group, blank=True, null=True, default=None, on_delete=models.SET_DEFAULT 146 ) 147 148 lookup_groups_from_user = models.BooleanField( 149 default=False, 150 help_text=_( 151 "Lookup group membership based on a user attribute instead of a group attribute. " 152 "This allows nested group resolution on systems like FreeIPA and Active Directory" 153 ), 154 ) 155 156 delete_not_found_objects = models.BooleanField( 157 default=False, 158 help_text=_( 159 "Delete authentik users and groups which were previously supplied by this source, " 160 "but are now missing from it." 161 ), 162 ) 163 164 @property 165 def component(self) -> str: 166 return "ak-source-ldap-form" 167 168 @property 169 def serializer(self) -> type[Serializer]: 170 from authentik.sources.ldap.api.sources import LDAPSourceSerializer 171 172 return LDAPSourceSerializer 173 174 @property 175 def schedule_specs(self) -> list[ScheduleSpec]: 176 from authentik.sources.ldap.tasks import ldap_connectivity_check, ldap_sync 177 178 return [ 179 ScheduleSpec( 180 actor=ldap_sync, 181 uid=self.slug, 182 args=(self.pk,), 183 crontab=f"{fqdn_rand('ldap_sync/' + str(self.pk))} */2 * * *", 184 send_on_save=True, 185 ), 186 ScheduleSpec( 187 actor=ldap_connectivity_check, 188 uid=self.slug, 189 args=(self.pk,), 190 crontab=f"{fqdn_rand('ldap_connectivity_check/' + str(self.pk))} * * * *", 191 send_on_save=True, 192 ), 193 ] 194 195 @property 196 def property_mapping_type(self) -> type[PropertyMapping]: 197 from authentik.sources.ldap.models import LDAPSourcePropertyMapping 198 199 return LDAPSourcePropertyMapping 200 201 def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs): 202 properties.setdefault("attributes", {})[LDAP_DISTINGUISHED_NAME] = dn 203 # TODO: Remove after 2026.5, still stored for legacy 204 if self.object_uniqueness_field in ldap: 205 properties["attributes"][LDAP_UNIQUENESS] = flatten( 206 ldap.get(self.object_uniqueness_field) 207 ) 208 return properties 209 210 def get_base_user_properties(self, **kwargs): 211 return self.update_properties_with_uniqueness_field({}, **kwargs) 212 213 def get_base_group_properties(self, **kwargs): 214 return self.update_properties_with_uniqueness_field( 215 { 216 "parent": self.sync_parent_group, 217 }, 218 **kwargs, 219 ) 220 221 @property 222 def icon_url(self) -> str: 223 return static("authentik/sources/ldap.png") 224 225 def server(self, **kwargs) -> ServerPool: 226 """Get LDAP Server/ServerPool""" 227 servers = [] 228 tls_kwargs = {} 229 if self.peer_certificate: 230 tls_kwargs["ca_certs_data"] = self.peer_certificate.certificate_data 231 tls_kwargs["validate"] = CERT_REQUIRED 232 if self.client_certificate: 233 temp_dir = mkdtemp() 234 with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_cert: 235 temp_cert.write(self.client_certificate.certificate_data) 236 certificate_file = temp_cert.name 237 chmod(certificate_file, 0o600) 238 with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_key: 239 temp_key.write(self.client_certificate.key_data) 240 private_key_file = temp_key.name 241 chmod(private_key_file, 0o600) 242 tls_kwargs["local_private_key_file"] = private_key_file 243 tls_kwargs["local_certificate_file"] = certificate_file 244 if ciphers := CONFIG.get("ldap.tls.ciphers", None): 245 tls_kwargs["ciphers"] = ciphers.strip() 246 if self.sni: 247 tls_kwargs["sni"] = self.server_uri.split(",", maxsplit=1)[0].strip() 248 server_kwargs = { 249 "get_info": ALL, 250 "connect_timeout": LDAP_TIMEOUT, 251 "tls": Tls(**tls_kwargs), 252 } 253 server_kwargs.update(kwargs) 254 if "," in self.server_uri: 255 for server in self.server_uri.split(","): 256 servers.append(Server(server, **server_kwargs)) 257 else: 258 servers = [Server(self.server_uri, **server_kwargs)] 259 return ServerPool(servers, RANDOM, active=5, exhaust=True) 260 261 def connection( 262 self, 263 server: Server | None = None, 264 server_kwargs: dict | None = None, 265 connection_kwargs: dict | None = None, 266 ) -> Connection: 267 """Get a fully connected and bound LDAP Connection""" 268 server_kwargs = server_kwargs or {} 269 connection_kwargs = connection_kwargs or {} 270 if self.bind_cn is not None: 271 connection_kwargs.setdefault("user", self.bind_cn) 272 if self.bind_password is not None: 273 connection_kwargs.setdefault("password", self.bind_password) 274 conn = Connection( 275 server or self.server(**server_kwargs), 276 raise_exceptions=True, 277 receive_timeout=LDAP_TIMEOUT, 278 **connection_kwargs, 279 ) 280 281 if self.start_tls: 282 LOGGER.debug("Connection StartTLS", source=self) 283 conn.start_tls(read_server_info=False) 284 try: 285 successful = conn.bind() 286 if successful: 287 return conn 288 except ( 289 LDAPSchemaError, 290 LDAPInsufficientAccessRightsResult, 291 LDAPAdminLimitExceededResult, 292 LDAPAttributeError, 293 ) as exc: 294 # Schema error or rate limit during schema fetch, retry without schema info 295 # See https://github.com/goauthentik/authentik/issues/4590 296 # See also https://github.com/goauthentik/authentik/issues/3399 297 # LDAPAdminLimitExceededResult: Google Secure LDAP rate-limits schema queries 298 # LDAPAttributeError: Google Secure LDAP returns unsupported attrs in schema 299 if server_kwargs.get("get_info", ALL) == NONE: 300 LOGGER.warning("Failed to connect after schema downgrade", source=self, exc=exc) 301 raise exc 302 LOGGER.warning("Downgrading connection to no schema info", source=self, exc=exc) 303 server_kwargs["get_info"] = NONE 304 return self.connection(server, server_kwargs, connection_kwargs) 305 finally: 306 if conn.server.tls.certificate_file is not None and exists( 307 conn.server.tls.certificate_file 308 ): 309 rmtree(dirname(conn.server.tls.certificate_file)) 310 return RuntimeError("Failed to bind") 311 312 @property 313 def sync_lock(self) -> pglock.advisory: 314 """Postgres lock for syncing LDAP to prevent multiple parallel syncs happening""" 315 return pglock.advisory( 316 lock_id=f"goauthentik.io/{connection.schema_name}/sources/ldap/sync/{self.slug}", 317 timeout=0, 318 side_effect=pglock.Return, 319 ) 320 321 def get_ldap_server_info(self, srv: Server) -> dict[str, str]: 322 info = { 323 "vendor": _("N/A"), 324 "version": _("N/A"), 325 } 326 if srv.info: 327 info["vendor"] = str(flatten(srv.info.vendor_name)) 328 info["version"] = str(flatten(srv.info.vendor_version)) 329 return info 330 331 def check_connection(self) -> dict[str, dict[str, str]]: 332 """Check LDAP Connection""" 333 servers = self.server() 334 server_info = {} 335 # Check each individual server 336 for server in servers.servers: 337 server: Server 338 try: 339 conn = self.connection(server=server) 340 server_info[server.host] = { 341 "status": "ok", 342 **self.get_ldap_server_info(conn.server), 343 } 344 except LDAPException as exc: 345 server_info[server.host] = { 346 "status": str(exc), 347 } 348 # Check server pool 349 try: 350 conn = self.connection() 351 server_info["__all__"] = { 352 "status": "ok", 353 **self.get_ldap_server_info(conn.server), 354 } 355 except LDAPException as exc: 356 server_info["__all__"] = { 357 "status": str(exc), 358 } 359 return server_info 360 361 class Meta: 362 verbose_name = _("LDAP Source") 363 verbose_name_plural = _("LDAP Sources")
Federate LDAP Directory with authentik, or create new accounts in LDAP.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
168 @property 169 def serializer(self) -> type[Serializer]: 170 from authentik.sources.ldap.api.sources import LDAPSourceSerializer 171 172 return LDAPSourceSerializer
Get serializer for this model
174 @property 175 def schedule_specs(self) -> list[ScheduleSpec]: 176 from authentik.sources.ldap.tasks import ldap_connectivity_check, ldap_sync 177 178 return [ 179 ScheduleSpec( 180 actor=ldap_sync, 181 uid=self.slug, 182 args=(self.pk,), 183 crontab=f"{fqdn_rand('ldap_sync/' + str(self.pk))} */2 * * *", 184 send_on_save=True, 185 ), 186 ScheduleSpec( 187 actor=ldap_connectivity_check, 188 uid=self.slug, 189 args=(self.pk,), 190 crontab=f"{fqdn_rand('ldap_connectivity_check/' + str(self.pk))} * * * *", 191 send_on_save=True, 192 ), 193 ]
195 @property 196 def property_mapping_type(self) -> type[PropertyMapping]: 197 from authentik.sources.ldap.models import LDAPSourcePropertyMapping 198 199 return LDAPSourcePropertyMapping
Return property mapping type used by this object
201 def update_properties_with_uniqueness_field(self, properties, dn, ldap, **kwargs): 202 properties.setdefault("attributes", {})[LDAP_DISTINGUISHED_NAME] = dn 203 # TODO: Remove after 2026.5, still stored for legacy 204 if self.object_uniqueness_field in ldap: 205 properties["attributes"][LDAP_UNIQUENESS] = flatten( 206 ldap.get(self.object_uniqueness_field) 207 ) 208 return properties
210 def get_base_user_properties(self, **kwargs): 211 return self.update_properties_with_uniqueness_field({}, **kwargs)
Get base properties for a user to build final properties upon.
213 def get_base_group_properties(self, **kwargs): 214 return self.update_properties_with_uniqueness_field( 215 { 216 "parent": self.sync_parent_group, 217 }, 218 **kwargs, 219 )
Get base properties for a group to build final properties upon.
225 def server(self, **kwargs) -> ServerPool: 226 """Get LDAP Server/ServerPool""" 227 servers = [] 228 tls_kwargs = {} 229 if self.peer_certificate: 230 tls_kwargs["ca_certs_data"] = self.peer_certificate.certificate_data 231 tls_kwargs["validate"] = CERT_REQUIRED 232 if self.client_certificate: 233 temp_dir = mkdtemp() 234 with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_cert: 235 temp_cert.write(self.client_certificate.certificate_data) 236 certificate_file = temp_cert.name 237 chmod(certificate_file, 0o600) 238 with NamedTemporaryFile(mode="w", delete=False, dir=temp_dir) as temp_key: 239 temp_key.write(self.client_certificate.key_data) 240 private_key_file = temp_key.name 241 chmod(private_key_file, 0o600) 242 tls_kwargs["local_private_key_file"] = private_key_file 243 tls_kwargs["local_certificate_file"] = certificate_file 244 if ciphers := CONFIG.get("ldap.tls.ciphers", None): 245 tls_kwargs["ciphers"] = ciphers.strip() 246 if self.sni: 247 tls_kwargs["sni"] = self.server_uri.split(",", maxsplit=1)[0].strip() 248 server_kwargs = { 249 "get_info": ALL, 250 "connect_timeout": LDAP_TIMEOUT, 251 "tls": Tls(**tls_kwargs), 252 } 253 server_kwargs.update(kwargs) 254 if "," in self.server_uri: 255 for server in self.server_uri.split(","): 256 servers.append(Server(server, **server_kwargs)) 257 else: 258 servers = [Server(self.server_uri, **server_kwargs)] 259 return ServerPool(servers, RANDOM, active=5, exhaust=True)
Get LDAP Server/ServerPool
261 def connection( 262 self, 263 server: Server | None = None, 264 server_kwargs: dict | None = None, 265 connection_kwargs: dict | None = None, 266 ) -> Connection: 267 """Get a fully connected and bound LDAP Connection""" 268 server_kwargs = server_kwargs or {} 269 connection_kwargs = connection_kwargs or {} 270 if self.bind_cn is not None: 271 connection_kwargs.setdefault("user", self.bind_cn) 272 if self.bind_password is not None: 273 connection_kwargs.setdefault("password", self.bind_password) 274 conn = Connection( 275 server or self.server(**server_kwargs), 276 raise_exceptions=True, 277 receive_timeout=LDAP_TIMEOUT, 278 **connection_kwargs, 279 ) 280 281 if self.start_tls: 282 LOGGER.debug("Connection StartTLS", source=self) 283 conn.start_tls(read_server_info=False) 284 try: 285 successful = conn.bind() 286 if successful: 287 return conn 288 except ( 289 LDAPSchemaError, 290 LDAPInsufficientAccessRightsResult, 291 LDAPAdminLimitExceededResult, 292 LDAPAttributeError, 293 ) as exc: 294 # Schema error or rate limit during schema fetch, retry without schema info 295 # See https://github.com/goauthentik/authentik/issues/4590 296 # See also https://github.com/goauthentik/authentik/issues/3399 297 # LDAPAdminLimitExceededResult: Google Secure LDAP rate-limits schema queries 298 # LDAPAttributeError: Google Secure LDAP returns unsupported attrs in schema 299 if server_kwargs.get("get_info", ALL) == NONE: 300 LOGGER.warning("Failed to connect after schema downgrade", source=self, exc=exc) 301 raise exc 302 LOGGER.warning("Downgrading connection to no schema info", source=self, exc=exc) 303 server_kwargs["get_info"] = NONE 304 return self.connection(server, server_kwargs, connection_kwargs) 305 finally: 306 if conn.server.tls.certificate_file is not None and exists( 307 conn.server.tls.certificate_file 308 ): 309 rmtree(dirname(conn.server.tls.certificate_file)) 310 return RuntimeError("Failed to bind")
Get a fully connected and bound LDAP Connection
312 @property 313 def sync_lock(self) -> pglock.advisory: 314 """Postgres lock for syncing LDAP to prevent multiple parallel syncs happening""" 315 return pglock.advisory( 316 lock_id=f"goauthentik.io/{connection.schema_name}/sources/ldap/sync/{self.slug}", 317 timeout=0, 318 side_effect=pglock.Return, 319 )
Postgres lock for syncing LDAP to prevent multiple parallel syncs happening
331 def check_connection(self) -> dict[str, dict[str, str]]: 332 """Check LDAP Connection""" 333 servers = self.server() 334 server_info = {} 335 # Check each individual server 336 for server in servers.servers: 337 server: Server 338 try: 339 conn = self.connection(server=server) 340 server_info[server.host] = { 341 "status": "ok", 342 **self.get_ldap_server_info(conn.server), 343 } 344 except LDAPException as exc: 345 server_info[server.host] = { 346 "status": str(exc), 347 } 348 # Check server pool 349 try: 350 conn = self.connection() 351 server_info["__all__"] = { 352 "status": "ok", 353 **self.get_ldap_server_info(conn.server), 354 } 355 except LDAPException as exc: 356 server_info["__all__"] = { 357 "status": str(exc), 358 } 359 return server_info
Check LDAP Connection
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related objects manager on the one-to-many relation created by GenericRelation.
In the example::
class Post(Model):
comments = GenericRelation(Comment)
post.comments is a ReverseGenericManyToOneDescriptor instance.
Accessor to the related objects manager on the one-to-many relation created by GenericRelation.
In the example::
class Post(Model):
comments = GenericRelation(Comment)
post.comments is a ReverseGenericManyToOneDescriptor instance.
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Inherited Members
- authentik.core.models.Source
- MANAGED_INBUILT
- name
- slug
- user_path_template
- enabled
- promoted
- user_property_mappings
- group_property_mappings
- icon
- authentication_flow
- enrollment_flow
- user_matching_mode
- group_matching_mode
- objects
- get_icon_url
- get_icon_themed_urls
- icon_themed_urls
- get_user_path
- ui_user_settings
- managed
- authentication_flow_id
- enrollment_flow_id
- get_user_matching_mode_display
- get_group_matching_mode_display
- policybindingmodel_ptr_id
- policybindingmodel_ptr
- user_set
- usersourceconnection_set
- groupsourceconnection_set
- oauthsource
- samlsource
- kerberossource
- ldapsource
- identificationstage_set
- plexsource
- scimsource
- telegramsource
- sourcestage_set
The requested object does not exist
The query returned multiple objects when only one was expected.
366class LDAPSourcePropertyMapping(PropertyMapping): 367 """Map LDAP Property to User or Group object attribute""" 368 369 @property 370 def component(self) -> str: 371 return "ak-property-mapping-source-ldap-form" 372 373 @property 374 def serializer(self) -> type[Serializer]: 375 from authentik.sources.ldap.api.property_mappings import LDAPSourcePropertyMappingSerializer 376 377 return LDAPSourcePropertyMappingSerializer 378 379 def __str__(self): 380 return str(self.name) 381 382 class Meta: 383 verbose_name = _("LDAP Source Property Mapping") 384 verbose_name_plural = _("LDAP Source Property Mappings")
Map LDAP Property to User or Group object attribute
373 @property 374 def serializer(self) -> type[Serializer]: 375 from authentik.sources.ldap.api.property_mappings import LDAPSourcePropertyMappingSerializer 376 377 return LDAPSourcePropertyMappingSerializer
Get serializer for this model
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.core.models.PropertyMapping
- pm_uuid
- name
- expression
- objects
- evaluate
- managed
- provider_set
- source_userpropertymappings_set
- source_grouppropertymappings_set
- notificationwebhookmapping
- oauthsourcepropertymapping
- scopemapping
- endpoint_set
- racpropertymapping
- radiusproviderpropertymapping
- samlsourcepropertymapping
- samlpropertymapping
- scimprovider_set
- scimmapping
- kerberossourcepropertymapping
- ldapsourcepropertymapping
- plexsourcepropertymapping
- scimsourcepropertymapping
- telegramsourcepropertymapping
- googleworkspaceprovider_set
- googleworkspaceprovidermapping
- microsoftentraprovider_set
- microsoftentraprovidermapping
The requested object does not exist
The query returned multiple objects when only one was expected.
387class UserLDAPSourceConnection(UserSourceConnection): 388 validated_by = models.UUIDField( 389 null=True, 390 blank=True, 391 help_text=_("Unique ID used while checking if this object still exists in the directory."), 392 ) 393 394 @property 395 def serializer(self) -> type[Serializer]: 396 from authentik.sources.ldap.api.connections import ( 397 UserLDAPSourceConnectionSerializer, 398 ) 399 400 return UserLDAPSourceConnectionSerializer 401 402 class Meta: 403 verbose_name = _("User LDAP Source Connection") 404 verbose_name_plural = _("User LDAP Source Connections") 405 indexes = [ 406 models.Index(fields=["validated_by"]), 407 ]
UserLDAPSourceConnection(id, created, last_updated, user, source, identifier, usersourceconnection_ptr, validated_by)
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
394 @property 395 def serializer(self) -> type[Serializer]: 396 from authentik.sources.ldap.api.connections import ( 397 UserLDAPSourceConnectionSerializer, 398 ) 399 400 return UserLDAPSourceConnectionSerializer
Get serializer for this model
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.core.models.UserSourceConnection
- user
- source
- identifier
- objects
- created
- last_updated
- user_id
- source_id
- get_next_by_created
- get_previous_by_created
- get_next_by_last_updated
- get_previous_by_last_updated
- id
- useroauthsourceconnection
- usersamlsourceconnection
- userkerberossourceconnection
- userldapsourceconnection
- userplexsourceconnection
- usertelegramsourceconnection
The requested object does not exist
The query returned multiple objects when only one was expected.
410class GroupLDAPSourceConnection(GroupSourceConnection): 411 validated_by = models.UUIDField( 412 null=True, 413 blank=True, 414 help_text=_("Unique ID used while checking if this object still exists in the directory."), 415 ) 416 417 @property 418 def serializer(self) -> type[Serializer]: 419 from authentik.sources.ldap.api.connections import ( 420 GroupLDAPSourceConnectionSerializer, 421 ) 422 423 return GroupLDAPSourceConnectionSerializer 424 425 class Meta: 426 verbose_name = _("Group LDAP Source Connection") 427 verbose_name_plural = _("Group LDAP Source Connections") 428 indexes = [ 429 models.Index(fields=["validated_by"]), 430 ]
GroupLDAPSourceConnection(id, created, last_updated, group, source, identifier, groupsourceconnection_ptr, validated_by)
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
417 @property 418 def serializer(self) -> type[Serializer]: 419 from authentik.sources.ldap.api.connections import ( 420 GroupLDAPSourceConnectionSerializer, 421 ) 422 423 return GroupLDAPSourceConnectionSerializer
Get serializer for this model
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.core.models.GroupSourceConnection
- group
- source
- identifier
- objects
- created
- last_updated
- group_id
- source_id
- get_next_by_created
- get_previous_by_created
- get_next_by_last_updated
- get_previous_by_last_updated
- id
- groupoauthsourceconnection
- groupsamlsourceconnection
- groupkerberossourceconnection
- groupldapsourceconnection
- groupplexsourceconnection
- grouptelegramsourceconnection
The requested object does not exist
The query returned multiple objects when only one was expected.