authentik.sources.ldap.sync.base
Sync LDAP Users and groups into authentik
1"""Sync LDAP Users and groups into authentik""" 2 3from collections.abc import Generator 4 5from django.conf import settings 6from ldap3 import DEREF_ALWAYS, SUBTREE, Connection 7from structlog.stdlib import BoundLogger, get_logger 8 9from authentik.core.sources.mapper import SourceMapper 10from authentik.lib.config import CONFIG 11from authentik.lib.sync.mapper import PropertyMappingManager 12from authentik.sources.ldap.models import LDAPSource, flatten 13from authentik.tasks.models import Task 14 15 16class BaseLDAPSynchronizer: 17 """Sync LDAP Users and groups into authentik""" 18 19 _source: LDAPSource 20 _task: Task 21 _logger: BoundLogger 22 _connection: Connection 23 mapper: SourceMapper 24 manager: PropertyMappingManager 25 26 def __init__(self, source: LDAPSource, task: Task): 27 self._source = source 28 self._task = task 29 self._connection = source.connection() 30 self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__) 31 32 @staticmethod 33 def name() -> str: 34 """UI name for the type of object this class synchronizes""" 35 raise NotImplementedError 36 37 def sync_full(self): 38 """Run full sync, this function should only be used in tests""" 39 if not settings.TEST: # noqa 40 raise RuntimeError( 41 f"{self.__class__.__name__}.sync_full() should only be used in tests" 42 ) 43 for page in self.get_objects(): 44 self.sync(page) 45 46 def sync(self, page_data: list) -> int: 47 """Sync function, implemented in subclass""" 48 raise NotImplementedError() 49 50 @property 51 def base_dn_users(self) -> str: 52 """Shortcut to get full base_dn for user lookups""" 53 if self._source.additional_user_dn: 54 return f"{self._source.additional_user_dn},{self._source.base_dn}" 55 return self._source.base_dn 56 57 @property 58 def base_dn_groups(self) -> str: 59 """Shortcut to get full base_dn for group lookups""" 60 if self._source.additional_group_dn: 61 return f"{self._source.additional_group_dn},{self._source.base_dn}" 62 return self._source.base_dn 63 64 def get_objects(self, **kwargs) -> Generator: 65 """Get objects from LDAP, implemented in subclass""" 66 raise NotImplementedError() 67 68 def get_attributes(self, object): 69 if "attributes" not in object: 70 return 71 return object.get("attributes", {}) 72 73 def get_identifier(self, attributes: dict): 74 if not attributes.get(self._source.object_uniqueness_field): 75 return 76 return flatten(attributes[self._source.object_uniqueness_field]) 77 78 def search_paginator( # noqa: PLR0913 79 self, 80 search_base, 81 search_filter, 82 search_scope=SUBTREE, 83 dereference_aliases=DEREF_ALWAYS, 84 attributes=None, 85 size_limit=0, 86 time_limit=0, 87 types_only=False, 88 get_operational_attributes=False, 89 controls=None, 90 paged_size=None, 91 paged_criticality=False, 92 ): 93 """Search in pages, returns each page""" 94 cookie = True 95 if not paged_size: 96 paged_size = CONFIG.get_int("ldap.page_size", 50) 97 while cookie: 98 self._connection.search( 99 search_base, 100 search_filter, 101 search_scope, 102 dereference_aliases, 103 attributes, 104 size_limit, 105 time_limit, 106 types_only, 107 get_operational_attributes, 108 controls, 109 paged_size, 110 paged_criticality, 111 None if cookie is True else cookie, 112 ) 113 try: 114 cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][ 115 "cookie" 116 ] 117 except KeyError: 118 cookie = None 119 yield self._connection.response
class
BaseLDAPSynchronizer:
17class BaseLDAPSynchronizer: 18 """Sync LDAP Users and groups into authentik""" 19 20 _source: LDAPSource 21 _task: Task 22 _logger: BoundLogger 23 _connection: Connection 24 mapper: SourceMapper 25 manager: PropertyMappingManager 26 27 def __init__(self, source: LDAPSource, task: Task): 28 self._source = source 29 self._task = task 30 self._connection = source.connection() 31 self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__) 32 33 @staticmethod 34 def name() -> str: 35 """UI name for the type of object this class synchronizes""" 36 raise NotImplementedError 37 38 def sync_full(self): 39 """Run full sync, this function should only be used in tests""" 40 if not settings.TEST: # noqa 41 raise RuntimeError( 42 f"{self.__class__.__name__}.sync_full() should only be used in tests" 43 ) 44 for page in self.get_objects(): 45 self.sync(page) 46 47 def sync(self, page_data: list) -> int: 48 """Sync function, implemented in subclass""" 49 raise NotImplementedError() 50 51 @property 52 def base_dn_users(self) -> str: 53 """Shortcut to get full base_dn for user lookups""" 54 if self._source.additional_user_dn: 55 return f"{self._source.additional_user_dn},{self._source.base_dn}" 56 return self._source.base_dn 57 58 @property 59 def base_dn_groups(self) -> str: 60 """Shortcut to get full base_dn for group lookups""" 61 if self._source.additional_group_dn: 62 return f"{self._source.additional_group_dn},{self._source.base_dn}" 63 return self._source.base_dn 64 65 def get_objects(self, **kwargs) -> Generator: 66 """Get objects from LDAP, implemented in subclass""" 67 raise NotImplementedError() 68 69 def get_attributes(self, object): 70 if "attributes" not in object: 71 return 72 return object.get("attributes", {}) 73 74 def get_identifier(self, attributes: dict): 75 if not attributes.get(self._source.object_uniqueness_field): 76 return 77 return flatten(attributes[self._source.object_uniqueness_field]) 78 79 def search_paginator( # noqa: PLR0913 80 self, 81 search_base, 82 search_filter, 83 search_scope=SUBTREE, 84 dereference_aliases=DEREF_ALWAYS, 85 attributes=None, 86 size_limit=0, 87 time_limit=0, 88 types_only=False, 89 get_operational_attributes=False, 90 controls=None, 91 paged_size=None, 92 paged_criticality=False, 93 ): 94 """Search in pages, returns each page""" 95 cookie = True 96 if not paged_size: 97 paged_size = CONFIG.get_int("ldap.page_size", 50) 98 while cookie: 99 self._connection.search( 100 search_base, 101 search_filter, 102 search_scope, 103 dereference_aliases, 104 attributes, 105 size_limit, 106 time_limit, 107 types_only, 108 get_operational_attributes, 109 controls, 110 paged_size, 111 paged_criticality, 112 None if cookie is True else cookie, 113 ) 114 try: 115 cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][ 116 "cookie" 117 ] 118 except KeyError: 119 cookie = None 120 yield self._connection.response
Sync LDAP Users and groups into authentik
BaseLDAPSynchronizer( source: authentik.sources.ldap.models.LDAPSource, task: authentik.tasks.models.Task)
@staticmethod
def
name() -> str:
33 @staticmethod 34 def name() -> str: 35 """UI name for the type of object this class synchronizes""" 36 raise NotImplementedError
UI name for the type of object this class synchronizes
def
sync_full(self):
38 def sync_full(self): 39 """Run full sync, this function should only be used in tests""" 40 if not settings.TEST: # noqa 41 raise RuntimeError( 42 f"{self.__class__.__name__}.sync_full() should only be used in tests" 43 ) 44 for page in self.get_objects(): 45 self.sync(page)
Run full sync, this function should only be used in tests
def
sync(self, page_data: list) -> int:
47 def sync(self, page_data: list) -> int: 48 """Sync function, implemented in subclass""" 49 raise NotImplementedError()
Sync function, implemented in subclass
base_dn_users: str
51 @property 52 def base_dn_users(self) -> str: 53 """Shortcut to get full base_dn for user lookups""" 54 if self._source.additional_user_dn: 55 return f"{self._source.additional_user_dn},{self._source.base_dn}" 56 return self._source.base_dn
Shortcut to get full base_dn for user lookups
base_dn_groups: str
58 @property 59 def base_dn_groups(self) -> str: 60 """Shortcut to get full base_dn for group lookups""" 61 if self._source.additional_group_dn: 62 return f"{self._source.additional_group_dn},{self._source.base_dn}" 63 return self._source.base_dn
Shortcut to get full base_dn for group lookups
def
get_objects(self, **kwargs) -> Generator:
65 def get_objects(self, **kwargs) -> Generator: 66 """Get objects from LDAP, implemented in subclass""" 67 raise NotImplementedError()
Get objects from LDAP, implemented in subclass
def
search_paginator( self, search_base, search_filter, search_scope='SUBTREE', dereference_aliases='ALWAYS', attributes=None, size_limit=0, time_limit=0, types_only=False, get_operational_attributes=False, controls=None, paged_size=None, paged_criticality=False):
79 def search_paginator( # noqa: PLR0913 80 self, 81 search_base, 82 search_filter, 83 search_scope=SUBTREE, 84 dereference_aliases=DEREF_ALWAYS, 85 attributes=None, 86 size_limit=0, 87 time_limit=0, 88 types_only=False, 89 get_operational_attributes=False, 90 controls=None, 91 paged_size=None, 92 paged_criticality=False, 93 ): 94 """Search in pages, returns each page""" 95 cookie = True 96 if not paged_size: 97 paged_size = CONFIG.get_int("ldap.page_size", 50) 98 while cookie: 99 self._connection.search( 100 search_base, 101 search_filter, 102 search_scope, 103 dereference_aliases, 104 attributes, 105 size_limit, 106 time_limit, 107 types_only, 108 get_operational_attributes, 109 controls, 110 paged_size, 111 paged_criticality, 112 None if cookie is True else cookie, 113 ) 114 try: 115 cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][ 116 "cookie" 117 ] 118 except KeyError: 119 cookie = None 120 yield self._connection.response
Search in pages, returns each page