authentik.policies.api.bindings
policy binding API Views
1"""policy binding API Views""" 2 3from collections import OrderedDict 4 5from django.core.exceptions import ObjectDoesNotExist 6from django_filters.filters import BooleanFilter, ModelMultipleChoiceFilter 7from django_filters.filterset import FilterSet 8from rest_framework.exceptions import ValidationError 9from rest_framework.serializers import PrimaryKeyRelatedField 10from rest_framework.viewsets import ModelViewSet 11 12from authentik.core.api.groups import PartialUserSerializer 13from authentik.core.api.used_by import UsedByMixin 14from authentik.core.api.users import PartialGroupSerializer 15from authentik.core.api.utils import ModelSerializer 16from authentik.policies.api.policies import PolicySerializer 17from authentik.policies.models import PolicyBinding, PolicyBindingModel 18 19 20class PolicyBindingModelForeignKey(PrimaryKeyRelatedField): 21 """rest_framework PrimaryKeyRelatedField which resolves 22 model_manager's InheritanceQuerySet""" 23 24 def use_pk_only_optimization(self): 25 return False 26 27 def to_internal_value(self, data): 28 if self.pk_field is not None: 29 data = self.pk_field.to_internal_value(data) 30 try: 31 # Due to inheritance, a direct DB lookup for the primary key 32 # won't return anything. This is because the direct lookup 33 # checks the PK of PolicyBindingModel (for example), 34 # but we get given the Primary Key of the inheriting class 35 for model in self.get_queryset().select_subclasses().all(): 36 if str(model.pk) == str(data): 37 return model 38 # as a fallback we still try a direct lookup 39 return self.get_queryset().get_subclass(pk=data) 40 except ObjectDoesNotExist: 41 self.fail("does_not_exist", pk_value=data) 42 except TypeError, ValueError: 43 self.fail("incorrect_type", data_type=type(data).__name__) 44 45 def to_representation(self, value): 46 correct_model = PolicyBindingModel.objects.get_subclass(pbm_uuid=value.pbm_uuid) 47 return correct_model.pk 48 49 50class PolicyBindingSerializer(ModelSerializer): 51 """PolicyBinding Serializer""" 52 53 # Because we're not interested in the PolicyBindingModel's PK but rather the subclasses PK, 54 # we have to manually declare this field 55 target = PolicyBindingModelForeignKey( 56 queryset=PolicyBindingModel.objects.select_subclasses(), 57 required=True, 58 ) 59 60 policy_obj = PolicySerializer(required=False, allow_null=True, read_only=True, source="policy") 61 group_obj = PartialGroupSerializer( 62 required=False, allow_null=True, read_only=True, source="group" 63 ) 64 user_obj = PartialUserSerializer(required=False, allow_null=True, read_only=True, source="user") 65 66 class Meta: 67 model = PolicyBinding 68 fields = [ 69 "pk", 70 "policy", 71 "group", 72 "user", 73 "policy_obj", 74 "group_obj", 75 "user_obj", 76 "target", 77 "negate", 78 "enabled", 79 "order", 80 "timeout", 81 "failure_result", 82 ] 83 84 def validate(self, attrs: OrderedDict) -> OrderedDict: 85 """Check that either policy, group or user is set.""" 86 target: PolicyBindingModel = attrs.get("target") 87 supported = target.supported_policy_binding_targets() 88 supported.sort() 89 count = sum([bool(attrs.get(x, None)) for x in supported]) 90 invalid = count > 1 91 empty = count < 1 92 warning = ", ".join(f"'{x}'" for x in supported) 93 if invalid: 94 raise ValidationError(f"Only one of {warning} can be set.") 95 if empty: 96 raise ValidationError(f"One of {warning} must be set.") 97 return attrs 98 99 100class PolicyBindingFilter(FilterSet): 101 """Filter for PolicyBindings""" 102 103 target_in = ModelMultipleChoiceFilter( 104 field_name="target__pbm_uuid", 105 to_field_name="pbm_uuid", 106 queryset=PolicyBindingModel.objects.select_subclasses(), 107 ) 108 policy__isnull = BooleanFilter("policy", "isnull") 109 110 class Meta: 111 model = PolicyBinding 112 fields = ["policy", "policy__isnull", "target", "target_in", "enabled", "order", "timeout"] 113 114 115class PolicyBindingViewSet(UsedByMixin, ModelViewSet): 116 """PolicyBinding Viewset""" 117 118 queryset = ( 119 PolicyBinding.objects.all() 120 .select_related("target", "group", "user") 121 .prefetch_related("policy") 122 ) # prefetching policy so we resolve the subclass 123 serializer_class = PolicyBindingSerializer 124 search_fields = ["policy__name"] 125 filterset_class = PolicyBindingFilter 126 ordering = ["order", "pk"] 127 ordering_fields = ["order", "target__uuid", "pk"]
class
PolicyBindingModelForeignKey(rest_framework.relations.PrimaryKeyRelatedField):
21class PolicyBindingModelForeignKey(PrimaryKeyRelatedField): 22 """rest_framework PrimaryKeyRelatedField which resolves 23 model_manager's InheritanceQuerySet""" 24 25 def use_pk_only_optimization(self): 26 return False 27 28 def to_internal_value(self, data): 29 if self.pk_field is not None: 30 data = self.pk_field.to_internal_value(data) 31 try: 32 # Due to inheritance, a direct DB lookup for the primary key 33 # won't return anything. This is because the direct lookup 34 # checks the PK of PolicyBindingModel (for example), 35 # but we get given the Primary Key of the inheriting class 36 for model in self.get_queryset().select_subclasses().all(): 37 if str(model.pk) == str(data): 38 return model 39 # as a fallback we still try a direct lookup 40 return self.get_queryset().get_subclass(pk=data) 41 except ObjectDoesNotExist: 42 self.fail("does_not_exist", pk_value=data) 43 except TypeError, ValueError: 44 self.fail("incorrect_type", data_type=type(data).__name__) 45 46 def to_representation(self, value): 47 correct_model = PolicyBindingModel.objects.get_subclass(pbm_uuid=value.pbm_uuid) 48 return correct_model.pk
rest_framework PrimaryKeyRelatedField which resolves model_manager's InheritanceQuerySet
def
to_internal_value(self, data):
28 def to_internal_value(self, data): 29 if self.pk_field is not None: 30 data = self.pk_field.to_internal_value(data) 31 try: 32 # Due to inheritance, a direct DB lookup for the primary key 33 # won't return anything. This is because the direct lookup 34 # checks the PK of PolicyBindingModel (for example), 35 # but we get given the Primary Key of the inheriting class 36 for model in self.get_queryset().select_subclasses().all(): 37 if str(model.pk) == str(data): 38 return model 39 # as a fallback we still try a direct lookup 40 return self.get_queryset().get_subclass(pk=data) 41 except ObjectDoesNotExist: 42 self.fail("does_not_exist", pk_value=data) 43 except TypeError, ValueError: 44 self.fail("incorrect_type", data_type=type(data).__name__)
Transform the incoming primitive data into a native value.
51class PolicyBindingSerializer(ModelSerializer): 52 """PolicyBinding Serializer""" 53 54 # Because we're not interested in the PolicyBindingModel's PK but rather the subclasses PK, 55 # we have to manually declare this field 56 target = PolicyBindingModelForeignKey( 57 queryset=PolicyBindingModel.objects.select_subclasses(), 58 required=True, 59 ) 60 61 policy_obj = PolicySerializer(required=False, allow_null=True, read_only=True, source="policy") 62 group_obj = PartialGroupSerializer( 63 required=False, allow_null=True, read_only=True, source="group" 64 ) 65 user_obj = PartialUserSerializer(required=False, allow_null=True, read_only=True, source="user") 66 67 class Meta: 68 model = PolicyBinding 69 fields = [ 70 "pk", 71 "policy", 72 "group", 73 "user", 74 "policy_obj", 75 "group_obj", 76 "user_obj", 77 "target", 78 "negate", 79 "enabled", 80 "order", 81 "timeout", 82 "failure_result", 83 ] 84 85 def validate(self, attrs: OrderedDict) -> OrderedDict: 86 """Check that either policy, group or user is set.""" 87 target: PolicyBindingModel = attrs.get("target") 88 supported = target.supported_policy_binding_targets() 89 supported.sort() 90 count = sum([bool(attrs.get(x, None)) for x in supported]) 91 invalid = count > 1 92 empty = count < 1 93 warning = ", ".join(f"'{x}'" for x in supported) 94 if invalid: 95 raise ValidationError(f"Only one of {warning} can be set.") 96 if empty: 97 raise ValidationError(f"One of {warning} must be set.") 98 return attrs
PolicyBinding Serializer
def
validate(self, attrs: collections.OrderedDict) -> collections.OrderedDict:
85 def validate(self, attrs: OrderedDict) -> OrderedDict: 86 """Check that either policy, group or user is set.""" 87 target: PolicyBindingModel = attrs.get("target") 88 supported = target.supported_policy_binding_targets() 89 supported.sort() 90 count = sum([bool(attrs.get(x, None)) for x in supported]) 91 invalid = count > 1 92 empty = count < 1 93 warning = ", ".join(f"'{x}'" for x in supported) 94 if invalid: 95 raise ValidationError(f"Only one of {warning} can be set.") 96 if empty: 97 raise ValidationError(f"One of {warning} must be set.") 98 return attrs
Check that either policy, group or user is set.
Inherited Members
class
PolicyBindingSerializer.Meta:
67 class Meta: 68 model = PolicyBinding 69 fields = [ 70 "pk", 71 "policy", 72 "group", 73 "user", 74 "policy_obj", 75 "group_obj", 76 "user_obj", 77 "target", 78 "negate", 79 "enabled", 80 "order", 81 "timeout", 82 "failure_result", 83 ]
model =
<class 'authentik.policies.models.PolicyBinding'>
class
PolicyBindingFilter(django_filters.filterset.FilterSet):
101class PolicyBindingFilter(FilterSet): 102 """Filter for PolicyBindings""" 103 104 target_in = ModelMultipleChoiceFilter( 105 field_name="target__pbm_uuid", 106 to_field_name="pbm_uuid", 107 queryset=PolicyBindingModel.objects.select_subclasses(), 108 ) 109 policy__isnull = BooleanFilter("policy", "isnull") 110 111 class Meta: 112 model = PolicyBinding 113 fields = ["policy", "policy__isnull", "target", "target_in", "enabled", "order", "timeout"]
Filter for PolicyBindings
declared_filters =
OrderedDict({'target_in': <django_filters.filters.ModelMultipleChoiceFilter object>, 'policy__isnull': <django_filters.filters.BooleanFilter object>})
base_filters =
OrderedDict({'policy': <django_filters.filters.ModelChoiceFilter object>, 'policy__isnull': <django_filters.filters.BooleanFilter object>, 'target': <django_filters.filters.ModelChoiceFilter object>, 'target_in': <django_filters.filters.ModelMultipleChoiceFilter object>, 'enabled': <django_filters.filters.BooleanFilter object>, 'order': <django_filters.filters.NumberFilter object>, 'timeout': <django_filters.filters.NumberFilter object>})
class
PolicyBindingFilter.Meta:
111 class Meta: 112 model = PolicyBinding 113 fields = ["policy", "policy__isnull", "target", "target_in", "enabled", "order", "timeout"]
model =
<class 'authentik.policies.models.PolicyBinding'>
class
PolicyBindingViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
116class PolicyBindingViewSet(UsedByMixin, ModelViewSet): 117 """PolicyBinding Viewset""" 118 119 queryset = ( 120 PolicyBinding.objects.all() 121 .select_related("target", "group", "user") 122 .prefetch_related("policy") 123 ) # prefetching policy so we resolve the subclass 124 serializer_class = PolicyBindingSerializer 125 search_fields = ["policy__name"] 126 filterset_class = PolicyBindingFilter 127 ordering = ["order", "pk"] 128 ordering_fields = ["order", "target__uuid", "pk"]
PolicyBinding Viewset
serializer_class =
<class 'PolicyBindingSerializer'>
filterset_class =
<class 'PolicyBindingFilter'>