authentik.core.sources.mapper
1from typing import Any 2 3from django.http import HttpRequest 4from structlog.stdlib import get_logger 5 6from authentik.core.expression.exceptions import PropertyMappingExpressionException 7from authentik.core.models import Group, PropertyMapping, Source, User 8from authentik.events.models import Event, EventAction 9from authentik.lib.merge import MERGE_LIST_UNIQUE 10from authentik.lib.sync.mapper import PropertyMappingManager 11from authentik.policies.utils import delete_none_values 12 13LOGGER = get_logger() 14 15 16class SourceMapper: 17 def __init__(self, source: Source): 18 self.source = source 19 20 def get_manager( 21 self, object_type: type[User | Group], context_keys: list[str] 22 ) -> PropertyMappingManager: 23 """Get property mapping manager for this source.""" 24 25 qs = PropertyMapping.objects.none() 26 if object_type == User: 27 qs = self.source.user_property_mappings.all().select_subclasses() 28 elif object_type == Group: 29 qs = self.source.group_property_mappings.all().select_subclasses() 30 qs = qs.order_by("name") 31 return PropertyMappingManager( 32 qs, 33 self.source.property_mapping_type, 34 ["source", "properties"] + context_keys, 35 ) 36 37 def get_base_properties( 38 self, object_type: type[User | Group], **kwargs 39 ) -> dict[str, Any | dict[str, Any]]: 40 """Get base properties for a user or a group to build final properties upon.""" 41 if object_type == User: 42 properties = self.source.get_base_user_properties(**kwargs) 43 properties.setdefault("path", self.source.get_user_path()) 44 return properties 45 if object_type == Group: 46 return self.source.get_base_group_properties(**kwargs) 47 return {} 48 49 def build_object_properties( 50 self, 51 object_type: type[User | Group], 52 manager: PropertyMappingManager | None = None, 53 user: User | None = None, 54 request: HttpRequest | None = None, 55 **kwargs, 56 ) -> dict[str, Any | dict[str, Any]]: 57 """Build a user or group properties from the source configured property mappings.""" 58 59 properties = self.get_base_properties(object_type, **kwargs) 60 if "attributes" not in properties: 61 properties["attributes"] = {} 62 63 if not manager: 64 manager = self.get_manager(object_type, list(kwargs.keys())) 65 evaluations = manager.iter_eval( 66 user=user, 67 request=request, 68 return_mapping=True, 69 source=self.source, 70 properties=properties, 71 **kwargs, 72 ) 73 while True: 74 try: 75 value, mapping = next(evaluations) 76 except StopIteration: 77 break 78 except PropertyMappingExpressionException as exc: 79 Event.new( 80 EventAction.CONFIGURATION_ERROR, 81 message=f"Failed to evaluate property mapping: '{exc.mapping.name}'", 82 source=self, 83 mapping=exc.mapping, 84 ).save() 85 LOGGER.warning( 86 "Mapping failed to evaluate", 87 exc=exc, 88 source=self, 89 mapping=exc.mapping, 90 ) 91 raise exc 92 93 if not value or not isinstance(value, dict): 94 LOGGER.debug( 95 "Mapping evaluated to None or is not a dict. Skipping", 96 source=self, 97 mapping=mapping, 98 ) 99 continue 100 101 MERGE_LIST_UNIQUE.merge(properties, value) 102 103 return delete_none_values(properties)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class
SourceMapper:
17class SourceMapper: 18 def __init__(self, source: Source): 19 self.source = source 20 21 def get_manager( 22 self, object_type: type[User | Group], context_keys: list[str] 23 ) -> PropertyMappingManager: 24 """Get property mapping manager for this source.""" 25 26 qs = PropertyMapping.objects.none() 27 if object_type == User: 28 qs = self.source.user_property_mappings.all().select_subclasses() 29 elif object_type == Group: 30 qs = self.source.group_property_mappings.all().select_subclasses() 31 qs = qs.order_by("name") 32 return PropertyMappingManager( 33 qs, 34 self.source.property_mapping_type, 35 ["source", "properties"] + context_keys, 36 ) 37 38 def get_base_properties( 39 self, object_type: type[User | Group], **kwargs 40 ) -> dict[str, Any | dict[str, Any]]: 41 """Get base properties for a user or a group to build final properties upon.""" 42 if object_type == User: 43 properties = self.source.get_base_user_properties(**kwargs) 44 properties.setdefault("path", self.source.get_user_path()) 45 return properties 46 if object_type == Group: 47 return self.source.get_base_group_properties(**kwargs) 48 return {} 49 50 def build_object_properties( 51 self, 52 object_type: type[User | Group], 53 manager: PropertyMappingManager | None = None, 54 user: User | None = None, 55 request: HttpRequest | None = None, 56 **kwargs, 57 ) -> dict[str, Any | dict[str, Any]]: 58 """Build a user or group properties from the source configured property mappings.""" 59 60 properties = self.get_base_properties(object_type, **kwargs) 61 if "attributes" not in properties: 62 properties["attributes"] = {} 63 64 if not manager: 65 manager = self.get_manager(object_type, list(kwargs.keys())) 66 evaluations = manager.iter_eval( 67 user=user, 68 request=request, 69 return_mapping=True, 70 source=self.source, 71 properties=properties, 72 **kwargs, 73 ) 74 while True: 75 try: 76 value, mapping = next(evaluations) 77 except StopIteration: 78 break 79 except PropertyMappingExpressionException as exc: 80 Event.new( 81 EventAction.CONFIGURATION_ERROR, 82 message=f"Failed to evaluate property mapping: '{exc.mapping.name}'", 83 source=self, 84 mapping=exc.mapping, 85 ).save() 86 LOGGER.warning( 87 "Mapping failed to evaluate", 88 exc=exc, 89 source=self, 90 mapping=exc.mapping, 91 ) 92 raise exc 93 94 if not value or not isinstance(value, dict): 95 LOGGER.debug( 96 "Mapping evaluated to None or is not a dict. Skipping", 97 source=self, 98 mapping=mapping, 99 ) 100 continue 101 102 MERGE_LIST_UNIQUE.merge(properties, value) 103 104 return delete_none_values(properties)
SourceMapper(source: authentik.core.models.Source)
def
get_manager( self, object_type: type[authentik.core.models.User | authentik.core.models.Group], context_keys: list[str]) -> authentik.lib.sync.mapper.PropertyMappingManager:
21 def get_manager( 22 self, object_type: type[User | Group], context_keys: list[str] 23 ) -> PropertyMappingManager: 24 """Get property mapping manager for this source.""" 25 26 qs = PropertyMapping.objects.none() 27 if object_type == User: 28 qs = self.source.user_property_mappings.all().select_subclasses() 29 elif object_type == Group: 30 qs = self.source.group_property_mappings.all().select_subclasses() 31 qs = qs.order_by("name") 32 return PropertyMappingManager( 33 qs, 34 self.source.property_mapping_type, 35 ["source", "properties"] + context_keys, 36 )
Get property mapping manager for this source.
def
get_base_properties( self, object_type: type[authentik.core.models.User | authentik.core.models.Group], **kwargs) -> dict[str, typing.Any | dict[str, typing.Any]]:
38 def get_base_properties( 39 self, object_type: type[User | Group], **kwargs 40 ) -> dict[str, Any | dict[str, Any]]: 41 """Get base properties for a user or a group to build final properties upon.""" 42 if object_type == User: 43 properties = self.source.get_base_user_properties(**kwargs) 44 properties.setdefault("path", self.source.get_user_path()) 45 return properties 46 if object_type == Group: 47 return self.source.get_base_group_properties(**kwargs) 48 return {}
Get base properties for a user or a group to build final properties upon.
def
build_object_properties( self, object_type: type[authentik.core.models.User | authentik.core.models.Group], manager: authentik.lib.sync.mapper.PropertyMappingManager | None = None, user: authentik.core.models.User | None = None, request: django.http.request.HttpRequest | None = None, **kwargs) -> dict[str, typing.Any | dict[str, typing.Any]]:
50 def build_object_properties( 51 self, 52 object_type: type[User | Group], 53 manager: PropertyMappingManager | None = None, 54 user: User | None = None, 55 request: HttpRequest | None = None, 56 **kwargs, 57 ) -> dict[str, Any | dict[str, Any]]: 58 """Build a user or group properties from the source configured property mappings.""" 59 60 properties = self.get_base_properties(object_type, **kwargs) 61 if "attributes" not in properties: 62 properties["attributes"] = {} 63 64 if not manager: 65 manager = self.get_manager(object_type, list(kwargs.keys())) 66 evaluations = manager.iter_eval( 67 user=user, 68 request=request, 69 return_mapping=True, 70 source=self.source, 71 properties=properties, 72 **kwargs, 73 ) 74 while True: 75 try: 76 value, mapping = next(evaluations) 77 except StopIteration: 78 break 79 except PropertyMappingExpressionException as exc: 80 Event.new( 81 EventAction.CONFIGURATION_ERROR, 82 message=f"Failed to evaluate property mapping: '{exc.mapping.name}'", 83 source=self, 84 mapping=exc.mapping, 85 ).save() 86 LOGGER.warning( 87 "Mapping failed to evaluate", 88 exc=exc, 89 source=self, 90 mapping=exc.mapping, 91 ) 92 raise exc 93 94 if not value or not isinstance(value, dict): 95 LOGGER.debug( 96 "Mapping evaluated to None or is not a dict. Skipping", 97 source=self, 98 mapping=mapping, 99 ) 100 continue 101 102 MERGE_LIST_UNIQUE.merge(properties, value) 103 104 return delete_none_values(properties)
Build a user or group properties from the source configured property mappings.