authentik.core.api.property_mappings

PropertyMapping API Views

  1"""PropertyMapping API Views"""
  2
  3from json import dumps
  4
  5from django_filters.filters import AllValuesMultipleFilter, BooleanFilter
  6from django_filters.filterset import FilterSet
  7from drf_spectacular.types import OpenApiTypes
  8from drf_spectacular.utils import (
  9    OpenApiParameter,
 10    OpenApiResponse,
 11    extend_schema,
 12    extend_schema_field,
 13)
 14from guardian.shortcuts import get_objects_for_user
 15from rest_framework import mixins
 16from rest_framework.decorators import action
 17from rest_framework.exceptions import PermissionDenied
 18from rest_framework.fields import BooleanField, CharField, SerializerMethodField
 19from rest_framework.relations import PrimaryKeyRelatedField
 20from rest_framework.request import Request
 21from rest_framework.response import Response
 22from rest_framework.viewsets import GenericViewSet
 23
 24from authentik.api.validation import validate
 25from authentik.blueprints.api import ManagedSerializer
 26from authentik.core.api.object_types import TypesMixin
 27from authentik.core.api.used_by import UsedByMixin
 28from authentik.core.api.utils import (
 29    MetaNameSerializer,
 30    ModelSerializer,
 31    PassiveSerializer,
 32)
 33from authentik.core.expression.evaluator import PropertyMappingEvaluator
 34from authentik.core.expression.exceptions import PropertyMappingExpressionException
 35from authentik.core.models import Group, PropertyMapping, User
 36from authentik.events.utils import sanitize_item
 37from authentik.lib.utils.errors import exception_to_string
 38from authentik.policies.api.exec import PolicyTestSerializer
 39from authentik.rbac.decorators import permission_required
 40
 41
 42class PropertyMappingTestResultSerializer(PassiveSerializer):
 43    """Result of a Property-mapping test"""
 44
 45    result = CharField(read_only=True)
 46    successful = BooleanField(read_only=True)
 47
 48
 49class PropertyMappingSerializer(ManagedSerializer, ModelSerializer, MetaNameSerializer):
 50    """PropertyMapping Serializer"""
 51
 52    component = SerializerMethodField()
 53
 54    def get_component(self, obj: PropertyMapping) -> str:
 55        """Get object's component so that we know how to edit the object"""
 56        return obj.component
 57
 58    def validate_expression(self, expression: str) -> str:
 59        """Test Syntax"""
 60        evaluator = PropertyMappingEvaluator(
 61            self.instance,
 62        )
 63        evaluator.validate(expression)
 64        return expression
 65
 66    class Meta:
 67        model = PropertyMapping
 68        fields = [
 69            "pk",
 70            "managed",
 71            "name",
 72            "expression",
 73            "component",
 74            "verbose_name",
 75            "verbose_name_plural",
 76            "meta_model_name",
 77        ]
 78
 79
 80class PropertyMappingFilterSet(FilterSet):
 81    """Filter for PropertyMapping"""
 82
 83    managed = extend_schema_field(OpenApiTypes.STR)(AllValuesMultipleFilter(field_name="managed"))
 84
 85    managed__isnull = BooleanFilter(field_name="managed", lookup_expr="isnull")
 86
 87    class Meta:
 88        model = PropertyMapping
 89        fields = ["name", "managed"]
 90
 91
 92class PropertyMappingViewSet(
 93    TypesMixin,
 94    mixins.RetrieveModelMixin,
 95    mixins.DestroyModelMixin,
 96    UsedByMixin,
 97    mixins.ListModelMixin,
 98    GenericViewSet,
 99):
100    """PropertyMapping Viewset"""
101
102    class PropertyMappingTestSerializer(PolicyTestSerializer):
103        """Test property mapping execution for a user/group with context"""
104
105        user = PrimaryKeyRelatedField(queryset=User.objects.all(), required=False, allow_null=True)
106        group = PrimaryKeyRelatedField(
107            queryset=Group.objects.all(), required=False, allow_null=True
108        )
109
110    queryset = PropertyMapping.objects.select_subclasses()
111    serializer_class = PropertyMappingSerializer
112    filterset_class = PropertyMappingFilterSet
113    ordering = ["name"]
114    search_fields = ["name"]
115
116    @permission_required("authentik_core.view_propertymapping")
117    @extend_schema(
118        request=PropertyMappingTestSerializer(),
119        responses={
120            200: PropertyMappingTestResultSerializer,
121            400: OpenApiResponse(description="Invalid parameters"),
122        },
123        parameters=[
124            OpenApiParameter(
125                name="format_result",
126                location=OpenApiParameter.QUERY,
127                type=OpenApiTypes.BOOL,
128            )
129        ],
130    )
131    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
132    @validate(PropertyMappingTestSerializer)
133    def test(self, request: Request, pk: str, body: PropertyMappingTestSerializer) -> Response:
134        """Test Property Mapping"""
135        _mapping: PropertyMapping = self.get_object()
136        # Use `get_subclass` to get correct class and correct `.evaluate` implementation
137        mapping: PropertyMapping = PropertyMapping.objects.get_subclass(pk=_mapping.pk)
138        # FIXME: when we separate policy mappings between ones for sources
139        # and ones for providers, we need to make the user field optional for the source mapping
140        format_result = str(request.GET.get("format_result", "false")).lower() == "true"
141
142        context: dict = body.validated_data.get("context", {})
143        context.setdefault("user", None)
144
145        if user := body.validated_data.get("user"):
146            # User permission check, only allow mapping testing for users that are readable
147            users = get_objects_for_user(request.user, "authentik_core.view_user").filter(
148                pk=user.pk
149            )
150            if not users.exists():
151                raise PermissionDenied()
152            context["user"] = user
153        if group := body.validated_data.get("group"):
154            # Group permission check, only allow mapping testing for groups that are readable
155            groups = get_objects_for_user(request.user, "authentik_core.view_group").filter(
156                pk=group.pk
157            )
158            if not groups.exists():
159                raise PermissionDenied()
160            context["group"] = group
161        context["request"] = self.request
162
163        response_data = {"successful": True, "result": ""}
164        try:
165            result = mapping.evaluate(dry_run=True, **context)
166            response_data["result"] = dumps(
167                sanitize_item(result), indent=(4 if format_result else None)
168            )
169        except PropertyMappingExpressionException as exc:
170            response_data["result"] = exception_to_string(exc.exc)
171            response_data["successful"] = False
172        except Exception as exc:  # noqa
173            response_data["result"] = exception_to_string(exc)
174            response_data["successful"] = False
175        response = PropertyMappingTestResultSerializer(response_data)
176        return Response(response.data)
class PropertyMappingTestResultSerializer(authentik.core.api.utils.PassiveSerializer):
43class PropertyMappingTestResultSerializer(PassiveSerializer):
44    """Result of a Property-mapping test"""
45
46    result = CharField(read_only=True)
47    successful = BooleanField(read_only=True)

Result of a Property-mapping test

result
successful
50class PropertyMappingSerializer(ManagedSerializer, ModelSerializer, MetaNameSerializer):
51    """PropertyMapping Serializer"""
52
53    component = SerializerMethodField()
54
55    def get_component(self, obj: PropertyMapping) -> str:
56        """Get object's component so that we know how to edit the object"""
57        return obj.component
58
59    def validate_expression(self, expression: str) -> str:
60        """Test Syntax"""
61        evaluator = PropertyMappingEvaluator(
62            self.instance,
63        )
64        evaluator.validate(expression)
65        return expression
66
67    class Meta:
68        model = PropertyMapping
69        fields = [
70            "pk",
71            "managed",
72            "name",
73            "expression",
74            "component",
75            "verbose_name",
76            "verbose_name_plural",
77            "meta_model_name",
78        ]

PropertyMapping Serializer

component
def get_component(self, obj: authentik.core.models.PropertyMapping) -> str:
55    def get_component(self, obj: PropertyMapping) -> str:
56        """Get object's component so that we know how to edit the object"""
57        return obj.component

Get object's component so that we know how to edit the object

def validate_expression(self, expression: str) -> str:
59    def validate_expression(self, expression: str) -> str:
60        """Test Syntax"""
61        evaluator = PropertyMappingEvaluator(
62            self.instance,
63        )
64        evaluator.validate(expression)
65        return expression

Test Syntax

class PropertyMappingSerializer.Meta:
67    class Meta:
68        model = PropertyMapping
69        fields = [
70            "pk",
71            "managed",
72            "name",
73            "expression",
74            "component",
75            "verbose_name",
76            "verbose_name_plural",
77            "meta_model_name",
78        ]
fields = ['pk', 'managed', 'name', 'expression', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name']
class PropertyMappingFilterSet(django_filters.filterset.FilterSet):
81class PropertyMappingFilterSet(FilterSet):
82    """Filter for PropertyMapping"""
83
84    managed = extend_schema_field(OpenApiTypes.STR)(AllValuesMultipleFilter(field_name="managed"))
85
86    managed__isnull = BooleanFilter(field_name="managed", lookup_expr="isnull")
87
88    class Meta:
89        model = PropertyMapping
90        fields = ["name", "managed"]

Filter for PropertyMapping

managed
managed__isnull
declared_filters = OrderedDict({'managed': <django_filters.filters.AllValuesMultipleFilter object>, 'managed__isnull': <django_filters.filters.BooleanFilter object>})
base_filters = OrderedDict({'name': <django_filters.filters.CharFilter object>, 'managed': <django_filters.filters.AllValuesMultipleFilter object>, 'managed__isnull': <django_filters.filters.BooleanFilter object>})
class PropertyMappingFilterSet.Meta:
88    class Meta:
89        model = PropertyMapping
90        fields = ["name", "managed"]
fields = ['name', 'managed']
class PropertyMappingViewSet(authentik.core.api.object_types.TypesMixin, rest_framework.mixins.RetrieveModelMixin, rest_framework.mixins.DestroyModelMixin, authentik.core.api.used_by.UsedByMixin, rest_framework.mixins.ListModelMixin, rest_framework.viewsets.GenericViewSet):
 93class PropertyMappingViewSet(
 94    TypesMixin,
 95    mixins.RetrieveModelMixin,
 96    mixins.DestroyModelMixin,
 97    UsedByMixin,
 98    mixins.ListModelMixin,
 99    GenericViewSet,
100):
101    """PropertyMapping Viewset"""
102
103    class PropertyMappingTestSerializer(PolicyTestSerializer):
104        """Test property mapping execution for a user/group with context"""
105
106        user = PrimaryKeyRelatedField(queryset=User.objects.all(), required=False, allow_null=True)
107        group = PrimaryKeyRelatedField(
108            queryset=Group.objects.all(), required=False, allow_null=True
109        )
110
111    queryset = PropertyMapping.objects.select_subclasses()
112    serializer_class = PropertyMappingSerializer
113    filterset_class = PropertyMappingFilterSet
114    ordering = ["name"]
115    search_fields = ["name"]
116
117    @permission_required("authentik_core.view_propertymapping")
118    @extend_schema(
119        request=PropertyMappingTestSerializer(),
120        responses={
121            200: PropertyMappingTestResultSerializer,
122            400: OpenApiResponse(description="Invalid parameters"),
123        },
124        parameters=[
125            OpenApiParameter(
126                name="format_result",
127                location=OpenApiParameter.QUERY,
128                type=OpenApiTypes.BOOL,
129            )
130        ],
131    )
132    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
133    @validate(PropertyMappingTestSerializer)
134    def test(self, request: Request, pk: str, body: PropertyMappingTestSerializer) -> Response:
135        """Test Property Mapping"""
136        _mapping: PropertyMapping = self.get_object()
137        # Use `get_subclass` to get correct class and correct `.evaluate` implementation
138        mapping: PropertyMapping = PropertyMapping.objects.get_subclass(pk=_mapping.pk)
139        # FIXME: when we separate policy mappings between ones for sources
140        # and ones for providers, we need to make the user field optional for the source mapping
141        format_result = str(request.GET.get("format_result", "false")).lower() == "true"
142
143        context: dict = body.validated_data.get("context", {})
144        context.setdefault("user", None)
145
146        if user := body.validated_data.get("user"):
147            # User permission check, only allow mapping testing for users that are readable
148            users = get_objects_for_user(request.user, "authentik_core.view_user").filter(
149                pk=user.pk
150            )
151            if not users.exists():
152                raise PermissionDenied()
153            context["user"] = user
154        if group := body.validated_data.get("group"):
155            # Group permission check, only allow mapping testing for groups that are readable
156            groups = get_objects_for_user(request.user, "authentik_core.view_group").filter(
157                pk=group.pk
158            )
159            if not groups.exists():
160                raise PermissionDenied()
161            context["group"] = group
162        context["request"] = self.request
163
164        response_data = {"successful": True, "result": ""}
165        try:
166            result = mapping.evaluate(dry_run=True, **context)
167            response_data["result"] = dumps(
168                sanitize_item(result), indent=(4 if format_result else None)
169            )
170        except PropertyMappingExpressionException as exc:
171            response_data["result"] = exception_to_string(exc.exc)
172            response_data["successful"] = False
173        except Exception as exc:  # noqa
174            response_data["result"] = exception_to_string(exc)
175            response_data["successful"] = False
176        response = PropertyMappingTestResultSerializer(response_data)
177        return Response(response.data)

PropertyMapping Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'PropertyMappingSerializer'>
filterset_class = <class 'PropertyMappingFilterSet'>
ordering = ['name']
search_fields = ['name']
@permission_required('authentik_core.view_propertymapping')
@extend_schema(request=PropertyMappingTestSerializer(), responses={200: PropertyMappingTestResultSerializer, 400: OpenApiResponse(description='Invalid parameters')}, parameters=[OpenApiParameter(name='format_result', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)])
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
@validate(PropertyMappingTestSerializer)
def test( self, request: rest_framework.request.Request, pk: str, body: PropertyMappingViewSet.PropertyMappingTestSerializer) -> rest_framework.response.Response:
117    @permission_required("authentik_core.view_propertymapping")
118    @extend_schema(
119        request=PropertyMappingTestSerializer(),
120        responses={
121            200: PropertyMappingTestResultSerializer,
122            400: OpenApiResponse(description="Invalid parameters"),
123        },
124        parameters=[
125            OpenApiParameter(
126                name="format_result",
127                location=OpenApiParameter.QUERY,
128                type=OpenApiTypes.BOOL,
129            )
130        ],
131    )
132    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
133    @validate(PropertyMappingTestSerializer)
134    def test(self, request: Request, pk: str, body: PropertyMappingTestSerializer) -> Response:
135        """Test Property Mapping"""
136        _mapping: PropertyMapping = self.get_object()
137        # Use `get_subclass` to get correct class and correct `.evaluate` implementation
138        mapping: PropertyMapping = PropertyMapping.objects.get_subclass(pk=_mapping.pk)
139        # FIXME: when we separate policy mappings between ones for sources
140        # and ones for providers, we need to make the user field optional for the source mapping
141        format_result = str(request.GET.get("format_result", "false")).lower() == "true"
142
143        context: dict = body.validated_data.get("context", {})
144        context.setdefault("user", None)
145
146        if user := body.validated_data.get("user"):
147            # User permission check, only allow mapping testing for users that are readable
148            users = get_objects_for_user(request.user, "authentik_core.view_user").filter(
149                pk=user.pk
150            )
151            if not users.exists():
152                raise PermissionDenied()
153            context["user"] = user
154        if group := body.validated_data.get("group"):
155            # Group permission check, only allow mapping testing for groups that are readable
156            groups = get_objects_for_user(request.user, "authentik_core.view_group").filter(
157                pk=group.pk
158            )
159            if not groups.exists():
160                raise PermissionDenied()
161            context["group"] = group
162        context["request"] = self.request
163
164        response_data = {"successful": True, "result": ""}
165        try:
166            result = mapping.evaluate(dry_run=True, **context)
167            response_data["result"] = dumps(
168                sanitize_item(result), indent=(4 if format_result else None)
169            )
170        except PropertyMappingExpressionException as exc:
171            response_data["result"] = exception_to_string(exc.exc)
172            response_data["successful"] = False
173        except Exception as exc:  # noqa
174            response_data["result"] = exception_to_string(exc)
175            response_data["successful"] = False
176        response = PropertyMappingTestResultSerializer(response_data)
177        return Response(response.data)

Test Property Mapping

name = None
description = None
suffix = None
detail = None
basename = None
class PropertyMappingViewSet.PropertyMappingTestSerializer(authentik.policies.api.exec.PolicyTestSerializer):
103    class PropertyMappingTestSerializer(PolicyTestSerializer):
104        """Test property mapping execution for a user/group with context"""
105
106        user = PrimaryKeyRelatedField(queryset=User.objects.all(), required=False, allow_null=True)
107        group = PrimaryKeyRelatedField(
108            queryset=Group.objects.all(), required=False, allow_null=True
109        )

Test property mapping execution for a user/group with context

user
group