authentik.core.api.property_mappings
PropertyMapping API Views
1"""PropertyMapping API Views""" 2 3from json import dumps 4 5from django_filters.filters import AllValuesMultipleFilter, BooleanFilter 6from django_filters.filterset import FilterSet 7from drf_spectacular.types import OpenApiTypes 8from drf_spectacular.utils import ( 9 OpenApiParameter, 10 OpenApiResponse, 11 extend_schema, 12 extend_schema_field, 13) 14from guardian.shortcuts import get_objects_for_user 15from rest_framework import mixins 16from rest_framework.decorators import action 17from rest_framework.exceptions import PermissionDenied 18from rest_framework.fields import BooleanField, CharField, SerializerMethodField 19from rest_framework.relations import PrimaryKeyRelatedField 20from rest_framework.request import Request 21from rest_framework.response import Response 22from rest_framework.viewsets import GenericViewSet 23 24from authentik.api.validation import validate 25from authentik.blueprints.api import ManagedSerializer 26from authentik.core.api.object_types import TypesMixin 27from authentik.core.api.used_by import UsedByMixin 28from authentik.core.api.utils import ( 29 MetaNameSerializer, 30 ModelSerializer, 31 PassiveSerializer, 32) 33from authentik.core.expression.evaluator import PropertyMappingEvaluator 34from authentik.core.expression.exceptions import PropertyMappingExpressionException 35from authentik.core.models import Group, PropertyMapping, User 36from authentik.events.utils import sanitize_item 37from authentik.lib.utils.errors import exception_to_string 38from authentik.policies.api.exec import PolicyTestSerializer 39from authentik.rbac.decorators import permission_required 40 41 42class PropertyMappingTestResultSerializer(PassiveSerializer): 43 """Result of a Property-mapping test""" 44 45 result = CharField(read_only=True) 46 successful = BooleanField(read_only=True) 47 48 49class PropertyMappingSerializer(ManagedSerializer, ModelSerializer, MetaNameSerializer): 50 """PropertyMapping Serializer""" 51 52 component = SerializerMethodField() 53 54 def get_component(self, obj: PropertyMapping) -> str: 55 """Get object's component so that we know how to edit the object""" 56 return obj.component 57 58 def validate_expression(self, expression: str) -> str: 59 """Test Syntax""" 60 evaluator = PropertyMappingEvaluator( 61 self.instance, 62 ) 63 evaluator.validate(expression) 64 return expression 65 66 class Meta: 67 model = PropertyMapping 68 fields = [ 69 "pk", 70 "managed", 71 "name", 72 "expression", 73 "component", 74 "verbose_name", 75 "verbose_name_plural", 76 "meta_model_name", 77 ] 78 79 80class PropertyMappingFilterSet(FilterSet): 81 """Filter for PropertyMapping""" 82 83 managed = extend_schema_field(OpenApiTypes.STR)(AllValuesMultipleFilter(field_name="managed")) 84 85 managed__isnull = BooleanFilter(field_name="managed", lookup_expr="isnull") 86 87 class Meta: 88 model = PropertyMapping 89 fields = ["name", "managed"] 90 91 92class PropertyMappingViewSet( 93 TypesMixin, 94 mixins.RetrieveModelMixin, 95 mixins.DestroyModelMixin, 96 UsedByMixin, 97 mixins.ListModelMixin, 98 GenericViewSet, 99): 100 """PropertyMapping Viewset""" 101 102 class PropertyMappingTestSerializer(PolicyTestSerializer): 103 """Test property mapping execution for a user/group with context""" 104 105 user = PrimaryKeyRelatedField(queryset=User.objects.all(), required=False, allow_null=True) 106 group = PrimaryKeyRelatedField( 107 queryset=Group.objects.all(), required=False, allow_null=True 108 ) 109 110 queryset = PropertyMapping.objects.select_subclasses() 111 serializer_class = PropertyMappingSerializer 112 filterset_class = PropertyMappingFilterSet 113 ordering = ["name"] 114 search_fields = ["name"] 115 116 @permission_required("authentik_core.view_propertymapping") 117 @extend_schema( 118 request=PropertyMappingTestSerializer(), 119 responses={ 120 200: PropertyMappingTestResultSerializer, 121 400: OpenApiResponse(description="Invalid parameters"), 122 }, 123 parameters=[ 124 OpenApiParameter( 125 name="format_result", 126 location=OpenApiParameter.QUERY, 127 type=OpenApiTypes.BOOL, 128 ) 129 ], 130 ) 131 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 132 @validate(PropertyMappingTestSerializer) 133 def test(self, request: Request, pk: str, body: PropertyMappingTestSerializer) -> Response: 134 """Test Property Mapping""" 135 _mapping: PropertyMapping = self.get_object() 136 # Use `get_subclass` to get correct class and correct `.evaluate` implementation 137 mapping: PropertyMapping = PropertyMapping.objects.get_subclass(pk=_mapping.pk) 138 # FIXME: when we separate policy mappings between ones for sources 139 # and ones for providers, we need to make the user field optional for the source mapping 140 format_result = str(request.GET.get("format_result", "false")).lower() == "true" 141 142 context: dict = body.validated_data.get("context", {}) 143 context.setdefault("user", None) 144 145 if user := body.validated_data.get("user"): 146 # User permission check, only allow mapping testing for users that are readable 147 users = get_objects_for_user(request.user, "authentik_core.view_user").filter( 148 pk=user.pk 149 ) 150 if not users.exists(): 151 raise PermissionDenied() 152 context["user"] = user 153 if group := body.validated_data.get("group"): 154 # Group permission check, only allow mapping testing for groups that are readable 155 groups = get_objects_for_user(request.user, "authentik_core.view_group").filter( 156 pk=group.pk 157 ) 158 if not groups.exists(): 159 raise PermissionDenied() 160 context["group"] = group 161 context["request"] = self.request 162 163 response_data = {"successful": True, "result": ""} 164 try: 165 result = mapping.evaluate(dry_run=True, **context) 166 response_data["result"] = dumps( 167 sanitize_item(result), indent=(4 if format_result else None) 168 ) 169 except PropertyMappingExpressionException as exc: 170 response_data["result"] = exception_to_string(exc.exc) 171 response_data["successful"] = False 172 except Exception as exc: # noqa 173 response_data["result"] = exception_to_string(exc) 174 response_data["successful"] = False 175 response = PropertyMappingTestResultSerializer(response_data) 176 return Response(response.data)
43class PropertyMappingTestResultSerializer(PassiveSerializer): 44 """Result of a Property-mapping test""" 45 46 result = CharField(read_only=True) 47 successful = BooleanField(read_only=True)
Result of a Property-mapping test
Inherited Members
class
PropertyMappingSerializer(authentik.blueprints.api.ManagedSerializer, authentik.core.api.utils.ModelSerializer, authentik.core.api.utils.MetaNameSerializer):
50class PropertyMappingSerializer(ManagedSerializer, ModelSerializer, MetaNameSerializer): 51 """PropertyMapping Serializer""" 52 53 component = SerializerMethodField() 54 55 def get_component(self, obj: PropertyMapping) -> str: 56 """Get object's component so that we know how to edit the object""" 57 return obj.component 58 59 def validate_expression(self, expression: str) -> str: 60 """Test Syntax""" 61 evaluator = PropertyMappingEvaluator( 62 self.instance, 63 ) 64 evaluator.validate(expression) 65 return expression 66 67 class Meta: 68 model = PropertyMapping 69 fields = [ 70 "pk", 71 "managed", 72 "name", 73 "expression", 74 "component", 75 "verbose_name", 76 "verbose_name_plural", 77 "meta_model_name", 78 ]
PropertyMapping Serializer
55 def get_component(self, obj: PropertyMapping) -> str: 56 """Get object's component so that we know how to edit the object""" 57 return obj.component
Get object's component so that we know how to edit the object
class
PropertyMappingSerializer.Meta:
67 class Meta: 68 model = PropertyMapping 69 fields = [ 70 "pk", 71 "managed", 72 "name", 73 "expression", 74 "component", 75 "verbose_name", 76 "verbose_name_plural", 77 "meta_model_name", 78 ]
model =
<class 'authentik.core.models.PropertyMapping'>
class
PropertyMappingFilterSet(django_filters.filterset.FilterSet):
81class PropertyMappingFilterSet(FilterSet): 82 """Filter for PropertyMapping""" 83 84 managed = extend_schema_field(OpenApiTypes.STR)(AllValuesMultipleFilter(field_name="managed")) 85 86 managed__isnull = BooleanFilter(field_name="managed", lookup_expr="isnull") 87 88 class Meta: 89 model = PropertyMapping 90 fields = ["name", "managed"]
Filter for PropertyMapping
class
PropertyMappingFilterSet.Meta:
model =
<class 'authentik.core.models.PropertyMapping'>
class
PropertyMappingViewSet(authentik.core.api.object_types.TypesMixin, rest_framework.mixins.RetrieveModelMixin, rest_framework.mixins.DestroyModelMixin, authentik.core.api.used_by.UsedByMixin, rest_framework.mixins.ListModelMixin, rest_framework.viewsets.GenericViewSet):
93class PropertyMappingViewSet( 94 TypesMixin, 95 mixins.RetrieveModelMixin, 96 mixins.DestroyModelMixin, 97 UsedByMixin, 98 mixins.ListModelMixin, 99 GenericViewSet, 100): 101 """PropertyMapping Viewset""" 102 103 class PropertyMappingTestSerializer(PolicyTestSerializer): 104 """Test property mapping execution for a user/group with context""" 105 106 user = PrimaryKeyRelatedField(queryset=User.objects.all(), required=False, allow_null=True) 107 group = PrimaryKeyRelatedField( 108 queryset=Group.objects.all(), required=False, allow_null=True 109 ) 110 111 queryset = PropertyMapping.objects.select_subclasses() 112 serializer_class = PropertyMappingSerializer 113 filterset_class = PropertyMappingFilterSet 114 ordering = ["name"] 115 search_fields = ["name"] 116 117 @permission_required("authentik_core.view_propertymapping") 118 @extend_schema( 119 request=PropertyMappingTestSerializer(), 120 responses={ 121 200: PropertyMappingTestResultSerializer, 122 400: OpenApiResponse(description="Invalid parameters"), 123 }, 124 parameters=[ 125 OpenApiParameter( 126 name="format_result", 127 location=OpenApiParameter.QUERY, 128 type=OpenApiTypes.BOOL, 129 ) 130 ], 131 ) 132 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 133 @validate(PropertyMappingTestSerializer) 134 def test(self, request: Request, pk: str, body: PropertyMappingTestSerializer) -> Response: 135 """Test Property Mapping""" 136 _mapping: PropertyMapping = self.get_object() 137 # Use `get_subclass` to get correct class and correct `.evaluate` implementation 138 mapping: PropertyMapping = PropertyMapping.objects.get_subclass(pk=_mapping.pk) 139 # FIXME: when we separate policy mappings between ones for sources 140 # and ones for providers, we need to make the user field optional for the source mapping 141 format_result = str(request.GET.get("format_result", "false")).lower() == "true" 142 143 context: dict = body.validated_data.get("context", {}) 144 context.setdefault("user", None) 145 146 if user := body.validated_data.get("user"): 147 # User permission check, only allow mapping testing for users that are readable 148 users = get_objects_for_user(request.user, "authentik_core.view_user").filter( 149 pk=user.pk 150 ) 151 if not users.exists(): 152 raise PermissionDenied() 153 context["user"] = user 154 if group := body.validated_data.get("group"): 155 # Group permission check, only allow mapping testing for groups that are readable 156 groups = get_objects_for_user(request.user, "authentik_core.view_group").filter( 157 pk=group.pk 158 ) 159 if not groups.exists(): 160 raise PermissionDenied() 161 context["group"] = group 162 context["request"] = self.request 163 164 response_data = {"successful": True, "result": ""} 165 try: 166 result = mapping.evaluate(dry_run=True, **context) 167 response_data["result"] = dumps( 168 sanitize_item(result), indent=(4 if format_result else None) 169 ) 170 except PropertyMappingExpressionException as exc: 171 response_data["result"] = exception_to_string(exc.exc) 172 response_data["successful"] = False 173 except Exception as exc: # noqa 174 response_data["result"] = exception_to_string(exc) 175 response_data["successful"] = False 176 response = PropertyMappingTestResultSerializer(response_data) 177 return Response(response.data)
PropertyMapping Viewset
serializer_class =
<class 'PropertyMappingSerializer'>
filterset_class =
<class 'PropertyMappingFilterSet'>
@extend_schema(request=PropertyMappingTestSerializer(), responses={200: PropertyMappingTestResultSerializer, 400: OpenApiResponse(description='Invalid parameters')}, parameters=[OpenApiParameter(name='format_result', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)])
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
@validate(PropertyMappingTestSerializer)
def
test( self, request: rest_framework.request.Request, pk: str, body: PropertyMappingViewSet.PropertyMappingTestSerializer) -> rest_framework.response.Response:
117 @permission_required("authentik_core.view_propertymapping") 118 @extend_schema( 119 request=PropertyMappingTestSerializer(), 120 responses={ 121 200: PropertyMappingTestResultSerializer, 122 400: OpenApiResponse(description="Invalid parameters"), 123 }, 124 parameters=[ 125 OpenApiParameter( 126 name="format_result", 127 location=OpenApiParameter.QUERY, 128 type=OpenApiTypes.BOOL, 129 ) 130 ], 131 ) 132 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 133 @validate(PropertyMappingTestSerializer) 134 def test(self, request: Request, pk: str, body: PropertyMappingTestSerializer) -> Response: 135 """Test Property Mapping""" 136 _mapping: PropertyMapping = self.get_object() 137 # Use `get_subclass` to get correct class and correct `.evaluate` implementation 138 mapping: PropertyMapping = PropertyMapping.objects.get_subclass(pk=_mapping.pk) 139 # FIXME: when we separate policy mappings between ones for sources 140 # and ones for providers, we need to make the user field optional for the source mapping 141 format_result = str(request.GET.get("format_result", "false")).lower() == "true" 142 143 context: dict = body.validated_data.get("context", {}) 144 context.setdefault("user", None) 145 146 if user := body.validated_data.get("user"): 147 # User permission check, only allow mapping testing for users that are readable 148 users = get_objects_for_user(request.user, "authentik_core.view_user").filter( 149 pk=user.pk 150 ) 151 if not users.exists(): 152 raise PermissionDenied() 153 context["user"] = user 154 if group := body.validated_data.get("group"): 155 # Group permission check, only allow mapping testing for groups that are readable 156 groups = get_objects_for_user(request.user, "authentik_core.view_group").filter( 157 pk=group.pk 158 ) 159 if not groups.exists(): 160 raise PermissionDenied() 161 context["group"] = group 162 context["request"] = self.request 163 164 response_data = {"successful": True, "result": ""} 165 try: 166 result = mapping.evaluate(dry_run=True, **context) 167 response_data["result"] = dumps( 168 sanitize_item(result), indent=(4 if format_result else None) 169 ) 170 except PropertyMappingExpressionException as exc: 171 response_data["result"] = exception_to_string(exc.exc) 172 response_data["successful"] = False 173 except Exception as exc: # noqa 174 response_data["result"] = exception_to_string(exc) 175 response_data["successful"] = False 176 response = PropertyMappingTestResultSerializer(response_data) 177 return Response(response.data)
Test Property Mapping
class
PropertyMappingViewSet.PropertyMappingTestSerializer(authentik.policies.api.exec.PolicyTestSerializer):
103 class PropertyMappingTestSerializer(PolicyTestSerializer): 104 """Test property mapping execution for a user/group with context""" 105 106 user = PrimaryKeyRelatedField(queryset=User.objects.all(), required=False, allow_null=True) 107 group = PrimaryKeyRelatedField( 108 queryset=Group.objects.all(), required=False, allow_null=True 109 )
Test property mapping execution for a user/group with context