authentik.sources.ldap.sync.base

Sync LDAP Users and groups into authentik

  1"""Sync LDAP Users and groups into authentik"""
  2
  3from collections.abc import Generator
  4
  5from django.conf import settings
  6from ldap3 import DEREF_ALWAYS, SUBTREE, Connection
  7from structlog.stdlib import BoundLogger, get_logger
  8
  9from authentik.core.sources.mapper import SourceMapper
 10from authentik.lib.config import CONFIG
 11from authentik.lib.sync.mapper import PropertyMappingManager
 12from authentik.sources.ldap.models import LDAPSource, flatten
 13from authentik.tasks.models import Task
 14
 15
 16class BaseLDAPSynchronizer:
 17    """Sync LDAP Users and groups into authentik"""
 18
 19    _source: LDAPSource
 20    _task: Task
 21    _logger: BoundLogger
 22    _connection: Connection
 23    mapper: SourceMapper
 24    manager: PropertyMappingManager
 25
 26    def __init__(self, source: LDAPSource, task: Task):
 27        self._source = source
 28        self._task = task
 29        self._connection = source.connection()
 30        self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__)
 31
 32    @staticmethod
 33    def name() -> str:
 34        """UI name for the type of object this class synchronizes"""
 35        raise NotImplementedError
 36
 37    def sync_full(self):
 38        """Run full sync, this function should only be used in tests"""
 39        if not settings.TEST:  # noqa
 40            raise RuntimeError(
 41                f"{self.__class__.__name__}.sync_full() should only be used in tests"
 42            )
 43        for page in self.get_objects():
 44            self.sync(page)
 45
 46    def sync(self, page_data: list) -> int:
 47        """Sync function, implemented in subclass"""
 48        raise NotImplementedError()
 49
 50    @property
 51    def base_dn_users(self) -> str:
 52        """Shortcut to get full base_dn for user lookups"""
 53        if self._source.additional_user_dn:
 54            return f"{self._source.additional_user_dn},{self._source.base_dn}"
 55        return self._source.base_dn
 56
 57    @property
 58    def base_dn_groups(self) -> str:
 59        """Shortcut to get full base_dn for group lookups"""
 60        if self._source.additional_group_dn:
 61            return f"{self._source.additional_group_dn},{self._source.base_dn}"
 62        return self._source.base_dn
 63
 64    def get_objects(self, **kwargs) -> Generator:
 65        """Get objects from LDAP, implemented in subclass"""
 66        raise NotImplementedError()
 67
 68    def get_attributes(self, object):
 69        if "attributes" not in object:
 70            return
 71        return object.get("attributes", {})
 72
 73    def get_identifier(self, attributes: dict):
 74        if not attributes.get(self._source.object_uniqueness_field):
 75            return
 76        return flatten(attributes[self._source.object_uniqueness_field])
 77
 78    def search_paginator(  # noqa: PLR0913
 79        self,
 80        search_base,
 81        search_filter,
 82        search_scope=SUBTREE,
 83        dereference_aliases=DEREF_ALWAYS,
 84        attributes=None,
 85        size_limit=0,
 86        time_limit=0,
 87        types_only=False,
 88        get_operational_attributes=False,
 89        controls=None,
 90        paged_size=None,
 91        paged_criticality=False,
 92    ):
 93        """Search in pages, returns each page"""
 94        cookie = True
 95        if not paged_size:
 96            paged_size = CONFIG.get_int("ldap.page_size", 50)
 97        while cookie:
 98            self._connection.search(
 99                search_base,
100                search_filter,
101                search_scope,
102                dereference_aliases,
103                attributes,
104                size_limit,
105                time_limit,
106                types_only,
107                get_operational_attributes,
108                controls,
109                paged_size,
110                paged_criticality,
111                None if cookie is True else cookie,
112            )
113            try:
114                cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][
115                    "cookie"
116                ]
117            except KeyError:
118                cookie = None
119            yield self._connection.response
class BaseLDAPSynchronizer:
 17class BaseLDAPSynchronizer:
 18    """Sync LDAP Users and groups into authentik"""
 19
 20    _source: LDAPSource
 21    _task: Task
 22    _logger: BoundLogger
 23    _connection: Connection
 24    mapper: SourceMapper
 25    manager: PropertyMappingManager
 26
 27    def __init__(self, source: LDAPSource, task: Task):
 28        self._source = source
 29        self._task = task
 30        self._connection = source.connection()
 31        self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__)
 32
 33    @staticmethod
 34    def name() -> str:
 35        """UI name for the type of object this class synchronizes"""
 36        raise NotImplementedError
 37
 38    def sync_full(self):
 39        """Run full sync, this function should only be used in tests"""
 40        if not settings.TEST:  # noqa
 41            raise RuntimeError(
 42                f"{self.__class__.__name__}.sync_full() should only be used in tests"
 43            )
 44        for page in self.get_objects():
 45            self.sync(page)
 46
 47    def sync(self, page_data: list) -> int:
 48        """Sync function, implemented in subclass"""
 49        raise NotImplementedError()
 50
 51    @property
 52    def base_dn_users(self) -> str:
 53        """Shortcut to get full base_dn for user lookups"""
 54        if self._source.additional_user_dn:
 55            return f"{self._source.additional_user_dn},{self._source.base_dn}"
 56        return self._source.base_dn
 57
 58    @property
 59    def base_dn_groups(self) -> str:
 60        """Shortcut to get full base_dn for group lookups"""
 61        if self._source.additional_group_dn:
 62            return f"{self._source.additional_group_dn},{self._source.base_dn}"
 63        return self._source.base_dn
 64
 65    def get_objects(self, **kwargs) -> Generator:
 66        """Get objects from LDAP, implemented in subclass"""
 67        raise NotImplementedError()
 68
 69    def get_attributes(self, object):
 70        if "attributes" not in object:
 71            return
 72        return object.get("attributes", {})
 73
 74    def get_identifier(self, attributes: dict):
 75        if not attributes.get(self._source.object_uniqueness_field):
 76            return
 77        return flatten(attributes[self._source.object_uniqueness_field])
 78
 79    def search_paginator(  # noqa: PLR0913
 80        self,
 81        search_base,
 82        search_filter,
 83        search_scope=SUBTREE,
 84        dereference_aliases=DEREF_ALWAYS,
 85        attributes=None,
 86        size_limit=0,
 87        time_limit=0,
 88        types_only=False,
 89        get_operational_attributes=False,
 90        controls=None,
 91        paged_size=None,
 92        paged_criticality=False,
 93    ):
 94        """Search in pages, returns each page"""
 95        cookie = True
 96        if not paged_size:
 97            paged_size = CONFIG.get_int("ldap.page_size", 50)
 98        while cookie:
 99            self._connection.search(
100                search_base,
101                search_filter,
102                search_scope,
103                dereference_aliases,
104                attributes,
105                size_limit,
106                time_limit,
107                types_only,
108                get_operational_attributes,
109                controls,
110                paged_size,
111                paged_criticality,
112                None if cookie is True else cookie,
113            )
114            try:
115                cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][
116                    "cookie"
117                ]
118            except KeyError:
119                cookie = None
120            yield self._connection.response

Sync LDAP Users and groups into authentik

BaseLDAPSynchronizer( source: authentik.sources.ldap.models.LDAPSource, task: authentik.tasks.models.Task)
27    def __init__(self, source: LDAPSource, task: Task):
28        self._source = source
29        self._task = task
30        self._connection = source.connection()
31        self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__)
@staticmethod
def name() -> str:
33    @staticmethod
34    def name() -> str:
35        """UI name for the type of object this class synchronizes"""
36        raise NotImplementedError

UI name for the type of object this class synchronizes

def sync_full(self):
38    def sync_full(self):
39        """Run full sync, this function should only be used in tests"""
40        if not settings.TEST:  # noqa
41            raise RuntimeError(
42                f"{self.__class__.__name__}.sync_full() should only be used in tests"
43            )
44        for page in self.get_objects():
45            self.sync(page)

Run full sync, this function should only be used in tests

def sync(self, page_data: list) -> int:
47    def sync(self, page_data: list) -> int:
48        """Sync function, implemented in subclass"""
49        raise NotImplementedError()

Sync function, implemented in subclass

base_dn_users: str
51    @property
52    def base_dn_users(self) -> str:
53        """Shortcut to get full base_dn for user lookups"""
54        if self._source.additional_user_dn:
55            return f"{self._source.additional_user_dn},{self._source.base_dn}"
56        return self._source.base_dn

Shortcut to get full base_dn for user lookups

base_dn_groups: str
58    @property
59    def base_dn_groups(self) -> str:
60        """Shortcut to get full base_dn for group lookups"""
61        if self._source.additional_group_dn:
62            return f"{self._source.additional_group_dn},{self._source.base_dn}"
63        return self._source.base_dn

Shortcut to get full base_dn for group lookups

def get_objects(self, **kwargs) -> Generator:
65    def get_objects(self, **kwargs) -> Generator:
66        """Get objects from LDAP, implemented in subclass"""
67        raise NotImplementedError()

Get objects from LDAP, implemented in subclass

def get_attributes(self, object):
69    def get_attributes(self, object):
70        if "attributes" not in object:
71            return
72        return object.get("attributes", {})
def get_identifier(self, attributes: dict):
74    def get_identifier(self, attributes: dict):
75        if not attributes.get(self._source.object_uniqueness_field):
76            return
77        return flatten(attributes[self._source.object_uniqueness_field])
def search_paginator( self, search_base, search_filter, search_scope='SUBTREE', dereference_aliases='ALWAYS', attributes=None, size_limit=0, time_limit=0, types_only=False, get_operational_attributes=False, controls=None, paged_size=None, paged_criticality=False):
 79    def search_paginator(  # noqa: PLR0913
 80        self,
 81        search_base,
 82        search_filter,
 83        search_scope=SUBTREE,
 84        dereference_aliases=DEREF_ALWAYS,
 85        attributes=None,
 86        size_limit=0,
 87        time_limit=0,
 88        types_only=False,
 89        get_operational_attributes=False,
 90        controls=None,
 91        paged_size=None,
 92        paged_criticality=False,
 93    ):
 94        """Search in pages, returns each page"""
 95        cookie = True
 96        if not paged_size:
 97            paged_size = CONFIG.get_int("ldap.page_size", 50)
 98        while cookie:
 99            self._connection.search(
100                search_base,
101                search_filter,
102                search_scope,
103                dereference_aliases,
104                attributes,
105                size_limit,
106                time_limit,
107                types_only,
108                get_operational_attributes,
109                controls,
110                paged_size,
111                paged_criticality,
112                None if cookie is True else cookie,
113            )
114            try:
115                cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][
116                    "cookie"
117                ]
118            except KeyError:
119                cookie = None
120            yield self._connection.response

Search in pages, returns each page