authentik.sources.ldap.sync.base

Sync LDAP Users and groups into authentik

  1"""Sync LDAP Users and groups into authentik"""
  2
  3from collections.abc import Generator
  4
  5from django.conf import settings
  6from ldap3 import DEREF_ALWAYS, SUBTREE, Connection
  7from structlog.stdlib import BoundLogger, get_logger
  8
  9from authentik.core.sources.mapper import SourceMapper
 10from authentik.core.sources.matcher import SourceMatcher
 11from authentik.lib.config import CONFIG
 12from authentik.lib.sync.mapper import PropertyMappingManager
 13from authentik.sources.ldap.models import (
 14    GroupLDAPSourceConnection,
 15    LDAPSource,
 16    UserLDAPSourceConnection,
 17    flatten,
 18)
 19from authentik.tasks.models import Task
 20
 21
 22class BaseLDAPSynchronizer:
 23    """Sync LDAP Users and groups into authentik"""
 24
 25    _source: LDAPSource
 26    _task: Task
 27    _logger: BoundLogger
 28    _connection: Connection
 29    mapper: SourceMapper
 30    manager: PropertyMappingManager
 31
 32    def __init__(self, source: LDAPSource, task: Task):
 33        self._source = source
 34        self._task = task
 35        self._connection = source.connection()
 36        self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__)
 37        self.matcher = SourceMatcher(
 38            self._source, UserLDAPSourceConnection, GroupLDAPSourceConnection
 39        )
 40
 41    @staticmethod
 42    def name() -> str:
 43        """UI name for the type of object this class synchronizes"""
 44        raise NotImplementedError
 45
 46    def sync_full(self):
 47        """Run full sync, this function should only be used in tests"""
 48        if not settings.TEST:  # noqa
 49            raise RuntimeError(
 50                f"{self.__class__.__name__}.sync_full() should only be used in tests"
 51            )
 52        for page in self.get_objects():
 53            self.sync(page)
 54
 55    def sync(self, page_data: list) -> int:
 56        """Sync function, implemented in subclass"""
 57        raise NotImplementedError()
 58
 59    @property
 60    def base_dn_users(self) -> str:
 61        """Shortcut to get full base_dn for user lookups"""
 62        if self._source.additional_user_dn:
 63            return f"{self._source.additional_user_dn},{self._source.base_dn}"
 64        return self._source.base_dn
 65
 66    @property
 67    def base_dn_groups(self) -> str:
 68        """Shortcut to get full base_dn for group lookups"""
 69        if self._source.additional_group_dn:
 70            return f"{self._source.additional_group_dn},{self._source.base_dn}"
 71        return self._source.base_dn
 72
 73    def get_objects(self, **kwargs) -> Generator:
 74        """Get objects from LDAP, implemented in subclass"""
 75        raise NotImplementedError()
 76
 77    def get_attributes(self, object):
 78        if "attributes" not in object:
 79            return
 80        return object.get("attributes", {})
 81
 82    def get_identifier(self, attributes: dict):
 83        if not attributes.get(self._source.object_uniqueness_field):
 84            return
 85        return flatten(attributes[self._source.object_uniqueness_field])
 86
 87    def search_paginator(  # noqa: PLR0913
 88        self,
 89        search_base,
 90        search_filter,
 91        search_scope=SUBTREE,
 92        dereference_aliases=DEREF_ALWAYS,
 93        attributes=None,
 94        size_limit=0,
 95        time_limit=0,
 96        types_only=False,
 97        get_operational_attributes=False,
 98        controls=None,
 99        paged_size=None,
100        paged_criticality=False,
101    ):
102        """Search in pages, returns each page"""
103        cookie = True
104        if not paged_size:
105            paged_size = CONFIG.get_int("ldap.page_size", 50)
106        while cookie:
107            self._connection.search(
108                search_base,
109                search_filter,
110                search_scope,
111                dereference_aliases,
112                attributes,
113                size_limit,
114                time_limit,
115                types_only,
116                get_operational_attributes,
117                controls,
118                paged_size,
119                paged_criticality,
120                None if cookie is True else cookie,
121            )
122            try:
123                cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][
124                    "cookie"
125                ]
126            except KeyError:
127                cookie = None
128            yield self._connection.response
class BaseLDAPSynchronizer:
 23class BaseLDAPSynchronizer:
 24    """Sync LDAP Users and groups into authentik"""
 25
 26    _source: LDAPSource
 27    _task: Task
 28    _logger: BoundLogger
 29    _connection: Connection
 30    mapper: SourceMapper
 31    manager: PropertyMappingManager
 32
 33    def __init__(self, source: LDAPSource, task: Task):
 34        self._source = source
 35        self._task = task
 36        self._connection = source.connection()
 37        self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__)
 38        self.matcher = SourceMatcher(
 39            self._source, UserLDAPSourceConnection, GroupLDAPSourceConnection
 40        )
 41
 42    @staticmethod
 43    def name() -> str:
 44        """UI name for the type of object this class synchronizes"""
 45        raise NotImplementedError
 46
 47    def sync_full(self):
 48        """Run full sync, this function should only be used in tests"""
 49        if not settings.TEST:  # noqa
 50            raise RuntimeError(
 51                f"{self.__class__.__name__}.sync_full() should only be used in tests"
 52            )
 53        for page in self.get_objects():
 54            self.sync(page)
 55
 56    def sync(self, page_data: list) -> int:
 57        """Sync function, implemented in subclass"""
 58        raise NotImplementedError()
 59
 60    @property
 61    def base_dn_users(self) -> str:
 62        """Shortcut to get full base_dn for user lookups"""
 63        if self._source.additional_user_dn:
 64            return f"{self._source.additional_user_dn},{self._source.base_dn}"
 65        return self._source.base_dn
 66
 67    @property
 68    def base_dn_groups(self) -> str:
 69        """Shortcut to get full base_dn for group lookups"""
 70        if self._source.additional_group_dn:
 71            return f"{self._source.additional_group_dn},{self._source.base_dn}"
 72        return self._source.base_dn
 73
 74    def get_objects(self, **kwargs) -> Generator:
 75        """Get objects from LDAP, implemented in subclass"""
 76        raise NotImplementedError()
 77
 78    def get_attributes(self, object):
 79        if "attributes" not in object:
 80            return
 81        return object.get("attributes", {})
 82
 83    def get_identifier(self, attributes: dict):
 84        if not attributes.get(self._source.object_uniqueness_field):
 85            return
 86        return flatten(attributes[self._source.object_uniqueness_field])
 87
 88    def search_paginator(  # noqa: PLR0913
 89        self,
 90        search_base,
 91        search_filter,
 92        search_scope=SUBTREE,
 93        dereference_aliases=DEREF_ALWAYS,
 94        attributes=None,
 95        size_limit=0,
 96        time_limit=0,
 97        types_only=False,
 98        get_operational_attributes=False,
 99        controls=None,
100        paged_size=None,
101        paged_criticality=False,
102    ):
103        """Search in pages, returns each page"""
104        cookie = True
105        if not paged_size:
106            paged_size = CONFIG.get_int("ldap.page_size", 50)
107        while cookie:
108            self._connection.search(
109                search_base,
110                search_filter,
111                search_scope,
112                dereference_aliases,
113                attributes,
114                size_limit,
115                time_limit,
116                types_only,
117                get_operational_attributes,
118                controls,
119                paged_size,
120                paged_criticality,
121                None if cookie is True else cookie,
122            )
123            try:
124                cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][
125                    "cookie"
126                ]
127            except KeyError:
128                cookie = None
129            yield self._connection.response

Sync LDAP Users and groups into authentik

BaseLDAPSynchronizer( source: authentik.sources.ldap.models.LDAPSource, task: authentik.tasks.models.Task)
33    def __init__(self, source: LDAPSource, task: Task):
34        self._source = source
35        self._task = task
36        self._connection = source.connection()
37        self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__)
38        self.matcher = SourceMatcher(
39            self._source, UserLDAPSourceConnection, GroupLDAPSourceConnection
40        )
matcher
@staticmethod
def name() -> str:
42    @staticmethod
43    def name() -> str:
44        """UI name for the type of object this class synchronizes"""
45        raise NotImplementedError

UI name for the type of object this class synchronizes

def sync_full(self):
47    def sync_full(self):
48        """Run full sync, this function should only be used in tests"""
49        if not settings.TEST:  # noqa
50            raise RuntimeError(
51                f"{self.__class__.__name__}.sync_full() should only be used in tests"
52            )
53        for page in self.get_objects():
54            self.sync(page)

Run full sync, this function should only be used in tests

def sync(self, page_data: list) -> int:
56    def sync(self, page_data: list) -> int:
57        """Sync function, implemented in subclass"""
58        raise NotImplementedError()

Sync function, implemented in subclass

base_dn_users: str
60    @property
61    def base_dn_users(self) -> str:
62        """Shortcut to get full base_dn for user lookups"""
63        if self._source.additional_user_dn:
64            return f"{self._source.additional_user_dn},{self._source.base_dn}"
65        return self._source.base_dn

Shortcut to get full base_dn for user lookups

base_dn_groups: str
67    @property
68    def base_dn_groups(self) -> str:
69        """Shortcut to get full base_dn for group lookups"""
70        if self._source.additional_group_dn:
71            return f"{self._source.additional_group_dn},{self._source.base_dn}"
72        return self._source.base_dn

Shortcut to get full base_dn for group lookups

def get_objects(self, **kwargs) -> Generator:
74    def get_objects(self, **kwargs) -> Generator:
75        """Get objects from LDAP, implemented in subclass"""
76        raise NotImplementedError()

Get objects from LDAP, implemented in subclass

def get_attributes(self, object):
78    def get_attributes(self, object):
79        if "attributes" not in object:
80            return
81        return object.get("attributes", {})
def get_identifier(self, attributes: dict):
83    def get_identifier(self, attributes: dict):
84        if not attributes.get(self._source.object_uniqueness_field):
85            return
86        return flatten(attributes[self._source.object_uniqueness_field])
def search_paginator( self, search_base, search_filter, search_scope='SUBTREE', dereference_aliases='ALWAYS', attributes=None, size_limit=0, time_limit=0, types_only=False, get_operational_attributes=False, controls=None, paged_size=None, paged_criticality=False):
 88    def search_paginator(  # noqa: PLR0913
 89        self,
 90        search_base,
 91        search_filter,
 92        search_scope=SUBTREE,
 93        dereference_aliases=DEREF_ALWAYS,
 94        attributes=None,
 95        size_limit=0,
 96        time_limit=0,
 97        types_only=False,
 98        get_operational_attributes=False,
 99        controls=None,
100        paged_size=None,
101        paged_criticality=False,
102    ):
103        """Search in pages, returns each page"""
104        cookie = True
105        if not paged_size:
106            paged_size = CONFIG.get_int("ldap.page_size", 50)
107        while cookie:
108            self._connection.search(
109                search_base,
110                search_filter,
111                search_scope,
112                dereference_aliases,
113                attributes,
114                size_limit,
115                time_limit,
116                types_only,
117                get_operational_attributes,
118                controls,
119                paged_size,
120                paged_criticality,
121                None if cookie is True else cookie,
122            )
123            try:
124                cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][
125                    "cookie"
126                ]
127            except KeyError:
128                cookie = None
129            yield self._connection.response

Search in pages, returns each page