authentik.sources.ldap.sync.base
Sync LDAP Users and groups into authentik
1"""Sync LDAP Users and groups into authentik""" 2 3from collections.abc import Generator 4 5from django.conf import settings 6from ldap3 import DEREF_ALWAYS, SUBTREE, Connection 7from structlog.stdlib import BoundLogger, get_logger 8 9from authentik.core.sources.mapper import SourceMapper 10from authentik.core.sources.matcher import SourceMatcher 11from authentik.lib.config import CONFIG 12from authentik.lib.sync.mapper import PropertyMappingManager 13from authentik.sources.ldap.models import ( 14 GroupLDAPSourceConnection, 15 LDAPSource, 16 UserLDAPSourceConnection, 17 flatten, 18) 19from authentik.tasks.models import Task 20 21 22class BaseLDAPSynchronizer: 23 """Sync LDAP Users and groups into authentik""" 24 25 _source: LDAPSource 26 _task: Task 27 _logger: BoundLogger 28 _connection: Connection 29 mapper: SourceMapper 30 manager: PropertyMappingManager 31 32 def __init__(self, source: LDAPSource, task: Task): 33 self._source = source 34 self._task = task 35 self._connection = source.connection() 36 self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__) 37 self.matcher = SourceMatcher( 38 self._source, UserLDAPSourceConnection, GroupLDAPSourceConnection 39 ) 40 41 @staticmethod 42 def name() -> str: 43 """UI name for the type of object this class synchronizes""" 44 raise NotImplementedError 45 46 def sync_full(self): 47 """Run full sync, this function should only be used in tests""" 48 if not settings.TEST: # noqa 49 raise RuntimeError( 50 f"{self.__class__.__name__}.sync_full() should only be used in tests" 51 ) 52 for page in self.get_objects(): 53 self.sync(page) 54 55 def sync(self, page_data: list) -> int: 56 """Sync function, implemented in subclass""" 57 raise NotImplementedError() 58 59 @property 60 def base_dn_users(self) -> str: 61 """Shortcut to get full base_dn for user lookups""" 62 if self._source.additional_user_dn: 63 return f"{self._source.additional_user_dn},{self._source.base_dn}" 64 return self._source.base_dn 65 66 @property 67 def base_dn_groups(self) -> str: 68 """Shortcut to get full base_dn for group lookups""" 69 if self._source.additional_group_dn: 70 return f"{self._source.additional_group_dn},{self._source.base_dn}" 71 return self._source.base_dn 72 73 def get_objects(self, **kwargs) -> Generator: 74 """Get objects from LDAP, implemented in subclass""" 75 raise NotImplementedError() 76 77 def get_attributes(self, object): 78 if "attributes" not in object: 79 return 80 return object.get("attributes", {}) 81 82 def get_identifier(self, attributes: dict): 83 if not attributes.get(self._source.object_uniqueness_field): 84 return 85 return flatten(attributes[self._source.object_uniqueness_field]) 86 87 def search_paginator( # noqa: PLR0913 88 self, 89 search_base, 90 search_filter, 91 search_scope=SUBTREE, 92 dereference_aliases=DEREF_ALWAYS, 93 attributes=None, 94 size_limit=0, 95 time_limit=0, 96 types_only=False, 97 get_operational_attributes=False, 98 controls=None, 99 paged_size=None, 100 paged_criticality=False, 101 ): 102 """Search in pages, returns each page""" 103 cookie = True 104 if not paged_size: 105 paged_size = CONFIG.get_int("ldap.page_size", 50) 106 while cookie: 107 self._connection.search( 108 search_base, 109 search_filter, 110 search_scope, 111 dereference_aliases, 112 attributes, 113 size_limit, 114 time_limit, 115 types_only, 116 get_operational_attributes, 117 controls, 118 paged_size, 119 paged_criticality, 120 None if cookie is True else cookie, 121 ) 122 try: 123 cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][ 124 "cookie" 125 ] 126 except KeyError: 127 cookie = None 128 yield self._connection.response
class
BaseLDAPSynchronizer:
23class BaseLDAPSynchronizer: 24 """Sync LDAP Users and groups into authentik""" 25 26 _source: LDAPSource 27 _task: Task 28 _logger: BoundLogger 29 _connection: Connection 30 mapper: SourceMapper 31 manager: PropertyMappingManager 32 33 def __init__(self, source: LDAPSource, task: Task): 34 self._source = source 35 self._task = task 36 self._connection = source.connection() 37 self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__) 38 self.matcher = SourceMatcher( 39 self._source, UserLDAPSourceConnection, GroupLDAPSourceConnection 40 ) 41 42 @staticmethod 43 def name() -> str: 44 """UI name for the type of object this class synchronizes""" 45 raise NotImplementedError 46 47 def sync_full(self): 48 """Run full sync, this function should only be used in tests""" 49 if not settings.TEST: # noqa 50 raise RuntimeError( 51 f"{self.__class__.__name__}.sync_full() should only be used in tests" 52 ) 53 for page in self.get_objects(): 54 self.sync(page) 55 56 def sync(self, page_data: list) -> int: 57 """Sync function, implemented in subclass""" 58 raise NotImplementedError() 59 60 @property 61 def base_dn_users(self) -> str: 62 """Shortcut to get full base_dn for user lookups""" 63 if self._source.additional_user_dn: 64 return f"{self._source.additional_user_dn},{self._source.base_dn}" 65 return self._source.base_dn 66 67 @property 68 def base_dn_groups(self) -> str: 69 """Shortcut to get full base_dn for group lookups""" 70 if self._source.additional_group_dn: 71 return f"{self._source.additional_group_dn},{self._source.base_dn}" 72 return self._source.base_dn 73 74 def get_objects(self, **kwargs) -> Generator: 75 """Get objects from LDAP, implemented in subclass""" 76 raise NotImplementedError() 77 78 def get_attributes(self, object): 79 if "attributes" not in object: 80 return 81 return object.get("attributes", {}) 82 83 def get_identifier(self, attributes: dict): 84 if not attributes.get(self._source.object_uniqueness_field): 85 return 86 return flatten(attributes[self._source.object_uniqueness_field]) 87 88 def search_paginator( # noqa: PLR0913 89 self, 90 search_base, 91 search_filter, 92 search_scope=SUBTREE, 93 dereference_aliases=DEREF_ALWAYS, 94 attributes=None, 95 size_limit=0, 96 time_limit=0, 97 types_only=False, 98 get_operational_attributes=False, 99 controls=None, 100 paged_size=None, 101 paged_criticality=False, 102 ): 103 """Search in pages, returns each page""" 104 cookie = True 105 if not paged_size: 106 paged_size = CONFIG.get_int("ldap.page_size", 50) 107 while cookie: 108 self._connection.search( 109 search_base, 110 search_filter, 111 search_scope, 112 dereference_aliases, 113 attributes, 114 size_limit, 115 time_limit, 116 types_only, 117 get_operational_attributes, 118 controls, 119 paged_size, 120 paged_criticality, 121 None if cookie is True else cookie, 122 ) 123 try: 124 cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][ 125 "cookie" 126 ] 127 except KeyError: 128 cookie = None 129 yield self._connection.response
Sync LDAP Users and groups into authentik
BaseLDAPSynchronizer( source: authentik.sources.ldap.models.LDAPSource, task: authentik.tasks.models.Task)
33 def __init__(self, source: LDAPSource, task: Task): 34 self._source = source 35 self._task = task 36 self._connection = source.connection() 37 self._logger = get_logger().bind(source=source, syncer=self.__class__.__name__) 38 self.matcher = SourceMatcher( 39 self._source, UserLDAPSourceConnection, GroupLDAPSourceConnection 40 )
@staticmethod
def
name() -> str:
42 @staticmethod 43 def name() -> str: 44 """UI name for the type of object this class synchronizes""" 45 raise NotImplementedError
UI name for the type of object this class synchronizes
def
sync_full(self):
47 def sync_full(self): 48 """Run full sync, this function should only be used in tests""" 49 if not settings.TEST: # noqa 50 raise RuntimeError( 51 f"{self.__class__.__name__}.sync_full() should only be used in tests" 52 ) 53 for page in self.get_objects(): 54 self.sync(page)
Run full sync, this function should only be used in tests
def
sync(self, page_data: list) -> int:
56 def sync(self, page_data: list) -> int: 57 """Sync function, implemented in subclass""" 58 raise NotImplementedError()
Sync function, implemented in subclass
base_dn_users: str
60 @property 61 def base_dn_users(self) -> str: 62 """Shortcut to get full base_dn for user lookups""" 63 if self._source.additional_user_dn: 64 return f"{self._source.additional_user_dn},{self._source.base_dn}" 65 return self._source.base_dn
Shortcut to get full base_dn for user lookups
base_dn_groups: str
67 @property 68 def base_dn_groups(self) -> str: 69 """Shortcut to get full base_dn for group lookups""" 70 if self._source.additional_group_dn: 71 return f"{self._source.additional_group_dn},{self._source.base_dn}" 72 return self._source.base_dn
Shortcut to get full base_dn for group lookups
def
get_objects(self, **kwargs) -> Generator:
74 def get_objects(self, **kwargs) -> Generator: 75 """Get objects from LDAP, implemented in subclass""" 76 raise NotImplementedError()
Get objects from LDAP, implemented in subclass
def
search_paginator( self, search_base, search_filter, search_scope='SUBTREE', dereference_aliases='ALWAYS', attributes=None, size_limit=0, time_limit=0, types_only=False, get_operational_attributes=False, controls=None, paged_size=None, paged_criticality=False):
88 def search_paginator( # noqa: PLR0913 89 self, 90 search_base, 91 search_filter, 92 search_scope=SUBTREE, 93 dereference_aliases=DEREF_ALWAYS, 94 attributes=None, 95 size_limit=0, 96 time_limit=0, 97 types_only=False, 98 get_operational_attributes=False, 99 controls=None, 100 paged_size=None, 101 paged_criticality=False, 102 ): 103 """Search in pages, returns each page""" 104 cookie = True 105 if not paged_size: 106 paged_size = CONFIG.get_int("ldap.page_size", 50) 107 while cookie: 108 self._connection.search( 109 search_base, 110 search_filter, 111 search_scope, 112 dereference_aliases, 113 attributes, 114 size_limit, 115 time_limit, 116 types_only, 117 get_operational_attributes, 118 controls, 119 paged_size, 120 paged_criticality, 121 None if cookie is True else cookie, 122 ) 123 try: 124 cookie = self._connection.result["controls"]["1.2.840.113556.1.4.319"]["value"][ 125 "cookie" 126 ] 127 except KeyError: 128 cookie = None 129 yield self._connection.response
Search in pages, returns each page