authentik.core.sources.mapper

  1from typing import Any
  2
  3from django.http import HttpRequest
  4from structlog.stdlib import get_logger
  5
  6from authentik.core.expression.exceptions import PropertyMappingExpressionException
  7from authentik.core.models import Group, PropertyMapping, Source, User
  8from authentik.events.models import Event, EventAction
  9from authentik.lib.merge import MERGE_LIST_UNIQUE
 10from authentik.lib.sync.mapper import PropertyMappingManager
 11from authentik.policies.utils import delete_none_values
 12
 13LOGGER = get_logger()
 14
 15
 16class SourceMapper:
 17    def __init__(self, source: Source):
 18        self.source = source
 19
 20    def get_manager(
 21        self, object_type: type[User | Group], context_keys: list[str]
 22    ) -> PropertyMappingManager:
 23        """Get property mapping manager for this source."""
 24
 25        qs = PropertyMapping.objects.none()
 26        if object_type == User:
 27            qs = self.source.user_property_mappings.all().select_subclasses()
 28        elif object_type == Group:
 29            qs = self.source.group_property_mappings.all().select_subclasses()
 30        qs = qs.order_by("name")
 31        return PropertyMappingManager(
 32            qs,
 33            self.source.property_mapping_type,
 34            ["source", "properties"] + context_keys,
 35        )
 36
 37    def get_base_properties(
 38        self, object_type: type[User | Group], **kwargs
 39    ) -> dict[str, Any | dict[str, Any]]:
 40        """Get base properties for a user or a group to build final properties upon."""
 41        if object_type == User:
 42            properties = self.source.get_base_user_properties(**kwargs)
 43            properties.setdefault("path", self.source.get_user_path())
 44            return properties
 45        if object_type == Group:
 46            return self.source.get_base_group_properties(**kwargs)
 47        return {}
 48
 49    def build_object_properties(
 50        self,
 51        object_type: type[User | Group],
 52        manager: PropertyMappingManager | None = None,
 53        user: User | None = None,
 54        request: HttpRequest | None = None,
 55        **kwargs,
 56    ) -> dict[str, Any | dict[str, Any]]:
 57        """Build a user or group properties from the source configured property mappings."""
 58
 59        properties = self.get_base_properties(object_type, **kwargs)
 60        if "attributes" not in properties:
 61            properties["attributes"] = {}
 62
 63        if not manager:
 64            manager = self.get_manager(object_type, list(kwargs.keys()))
 65        evaluations = manager.iter_eval(
 66            user=user,
 67            request=request,
 68            return_mapping=True,
 69            source=self.source,
 70            properties=properties,
 71            **kwargs,
 72        )
 73        while True:
 74            try:
 75                value, mapping = next(evaluations)
 76            except StopIteration:
 77                break
 78            except PropertyMappingExpressionException as exc:
 79                Event.new(
 80                    EventAction.CONFIGURATION_ERROR,
 81                    message=f"Failed to evaluate property mapping: '{exc.mapping.name}'",
 82                    source=self,
 83                    mapping=exc.mapping,
 84                ).save()
 85                LOGGER.warning(
 86                    "Mapping failed to evaluate",
 87                    exc=exc,
 88                    source=self,
 89                    mapping=exc.mapping,
 90                )
 91                raise exc
 92
 93            if not value or not isinstance(value, dict):
 94                LOGGER.debug(
 95                    "Mapping evaluated to None or is not a dict. Skipping",
 96                    source=self,
 97                    mapping=mapping,
 98                )
 99                continue
100
101            MERGE_LIST_UNIQUE.merge(properties, value)
102
103        return delete_none_values(properties)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class SourceMapper:
 17class SourceMapper:
 18    def __init__(self, source: Source):
 19        self.source = source
 20
 21    def get_manager(
 22        self, object_type: type[User | Group], context_keys: list[str]
 23    ) -> PropertyMappingManager:
 24        """Get property mapping manager for this source."""
 25
 26        qs = PropertyMapping.objects.none()
 27        if object_type == User:
 28            qs = self.source.user_property_mappings.all().select_subclasses()
 29        elif object_type == Group:
 30            qs = self.source.group_property_mappings.all().select_subclasses()
 31        qs = qs.order_by("name")
 32        return PropertyMappingManager(
 33            qs,
 34            self.source.property_mapping_type,
 35            ["source", "properties"] + context_keys,
 36        )
 37
 38    def get_base_properties(
 39        self, object_type: type[User | Group], **kwargs
 40    ) -> dict[str, Any | dict[str, Any]]:
 41        """Get base properties for a user or a group to build final properties upon."""
 42        if object_type == User:
 43            properties = self.source.get_base_user_properties(**kwargs)
 44            properties.setdefault("path", self.source.get_user_path())
 45            return properties
 46        if object_type == Group:
 47            return self.source.get_base_group_properties(**kwargs)
 48        return {}
 49
 50    def build_object_properties(
 51        self,
 52        object_type: type[User | Group],
 53        manager: PropertyMappingManager | None = None,
 54        user: User | None = None,
 55        request: HttpRequest | None = None,
 56        **kwargs,
 57    ) -> dict[str, Any | dict[str, Any]]:
 58        """Build a user or group properties from the source configured property mappings."""
 59
 60        properties = self.get_base_properties(object_type, **kwargs)
 61        if "attributes" not in properties:
 62            properties["attributes"] = {}
 63
 64        if not manager:
 65            manager = self.get_manager(object_type, list(kwargs.keys()))
 66        evaluations = manager.iter_eval(
 67            user=user,
 68            request=request,
 69            return_mapping=True,
 70            source=self.source,
 71            properties=properties,
 72            **kwargs,
 73        )
 74        while True:
 75            try:
 76                value, mapping = next(evaluations)
 77            except StopIteration:
 78                break
 79            except PropertyMappingExpressionException as exc:
 80                Event.new(
 81                    EventAction.CONFIGURATION_ERROR,
 82                    message=f"Failed to evaluate property mapping: '{exc.mapping.name}'",
 83                    source=self,
 84                    mapping=exc.mapping,
 85                ).save()
 86                LOGGER.warning(
 87                    "Mapping failed to evaluate",
 88                    exc=exc,
 89                    source=self,
 90                    mapping=exc.mapping,
 91                )
 92                raise exc
 93
 94            if not value or not isinstance(value, dict):
 95                LOGGER.debug(
 96                    "Mapping evaluated to None or is not a dict. Skipping",
 97                    source=self,
 98                    mapping=mapping,
 99                )
100                continue
101
102            MERGE_LIST_UNIQUE.merge(properties, value)
103
104        return delete_none_values(properties)
SourceMapper(source: authentik.core.models.Source)
18    def __init__(self, source: Source):
19        self.source = source
source
def get_manager( self, object_type: type[authentik.core.models.User | authentik.core.models.Group], context_keys: list[str]) -> authentik.lib.sync.mapper.PropertyMappingManager:
21    def get_manager(
22        self, object_type: type[User | Group], context_keys: list[str]
23    ) -> PropertyMappingManager:
24        """Get property mapping manager for this source."""
25
26        qs = PropertyMapping.objects.none()
27        if object_type == User:
28            qs = self.source.user_property_mappings.all().select_subclasses()
29        elif object_type == Group:
30            qs = self.source.group_property_mappings.all().select_subclasses()
31        qs = qs.order_by("name")
32        return PropertyMappingManager(
33            qs,
34            self.source.property_mapping_type,
35            ["source", "properties"] + context_keys,
36        )

Get property mapping manager for this source.

def get_base_properties( self, object_type: type[authentik.core.models.User | authentik.core.models.Group], **kwargs) -> dict[str, typing.Any | dict[str, typing.Any]]:
38    def get_base_properties(
39        self, object_type: type[User | Group], **kwargs
40    ) -> dict[str, Any | dict[str, Any]]:
41        """Get base properties for a user or a group to build final properties upon."""
42        if object_type == User:
43            properties = self.source.get_base_user_properties(**kwargs)
44            properties.setdefault("path", self.source.get_user_path())
45            return properties
46        if object_type == Group:
47            return self.source.get_base_group_properties(**kwargs)
48        return {}

Get base properties for a user or a group to build final properties upon.

def build_object_properties( self, object_type: type[authentik.core.models.User | authentik.core.models.Group], manager: authentik.lib.sync.mapper.PropertyMappingManager | None = None, user: authentik.core.models.User | None = None, request: django.http.request.HttpRequest | None = None, **kwargs) -> dict[str, typing.Any | dict[str, typing.Any]]:
 50    def build_object_properties(
 51        self,
 52        object_type: type[User | Group],
 53        manager: PropertyMappingManager | None = None,
 54        user: User | None = None,
 55        request: HttpRequest | None = None,
 56        **kwargs,
 57    ) -> dict[str, Any | dict[str, Any]]:
 58        """Build a user or group properties from the source configured property mappings."""
 59
 60        properties = self.get_base_properties(object_type, **kwargs)
 61        if "attributes" not in properties:
 62            properties["attributes"] = {}
 63
 64        if not manager:
 65            manager = self.get_manager(object_type, list(kwargs.keys()))
 66        evaluations = manager.iter_eval(
 67            user=user,
 68            request=request,
 69            return_mapping=True,
 70            source=self.source,
 71            properties=properties,
 72            **kwargs,
 73        )
 74        while True:
 75            try:
 76                value, mapping = next(evaluations)
 77            except StopIteration:
 78                break
 79            except PropertyMappingExpressionException as exc:
 80                Event.new(
 81                    EventAction.CONFIGURATION_ERROR,
 82                    message=f"Failed to evaluate property mapping: '{exc.mapping.name}'",
 83                    source=self,
 84                    mapping=exc.mapping,
 85                ).save()
 86                LOGGER.warning(
 87                    "Mapping failed to evaluate",
 88                    exc=exc,
 89                    source=self,
 90                    mapping=exc.mapping,
 91                )
 92                raise exc
 93
 94            if not value or not isinstance(value, dict):
 95                LOGGER.debug(
 96                    "Mapping evaluated to None or is not a dict. Skipping",
 97                    source=self,
 98                    mapping=mapping,
 99                )
100                continue
101
102            MERGE_LIST_UNIQUE.merge(properties, value)
103
104        return delete_none_values(properties)

Build a user or group properties from the source configured property mappings.