authentik.policies.api.bindings
policy binding API Views
1"""policy binding API Views""" 2 3from collections import OrderedDict 4 5from django.core.exceptions import ObjectDoesNotExist 6from django_filters.filters import BooleanFilter, ModelMultipleChoiceFilter 7from django_filters.filterset import FilterSet 8from rest_framework.exceptions import ValidationError 9from rest_framework.serializers import PrimaryKeyRelatedField 10from rest_framework.viewsets import ModelViewSet 11 12from authentik.core.api.groups import PartialUserSerializer 13from authentik.core.api.used_by import UsedByMixin 14from authentik.core.api.users import PartialGroupSerializer 15from authentik.core.api.utils import ModelSerializer 16from authentik.policies.api.policies import PolicySerializer 17from authentik.policies.models import PolicyBinding, PolicyBindingModel 18 19 20class PolicyBindingModelForeignKey(PrimaryKeyRelatedField): 21 """rest_framework PrimaryKeyRelatedField which resolves 22 model_manager's InheritanceQuerySet""" 23 24 def use_pk_only_optimization(self): 25 return False 26 27 def to_internal_value(self, data): 28 if self.pk_field is not None: 29 data = self.pk_field.to_internal_value(data) 30 try: 31 # Due to inheritance, a direct DB lookup for the primary key 32 # won't return anything. This is because the direct lookup 33 # checks the PK of PolicyBindingModel (for example), 34 # but we get given the Primary Key of the inheriting class 35 for model in self.get_queryset().select_subclasses().all(): 36 if str(model.pk) == str(data): 37 return model 38 # as a fallback we still try a direct lookup 39 return self.get_queryset().get_subclass(pk=data) 40 except ObjectDoesNotExist: 41 self.fail("does_not_exist", pk_value=data) 42 except TypeError, ValueError: 43 self.fail("incorrect_type", data_type=type(data).__name__) 44 45 def to_representation(self, value): 46 correct_model = PolicyBindingModel.objects.get_subclass(pbm_uuid=value.pbm_uuid) 47 return correct_model.pk 48 49 50class PolicyBindingSerializer(ModelSerializer): 51 """PolicyBinding Serializer""" 52 53 # Because we're not interested in the PolicyBindingModel's PK but rather the subclasses PK, 54 # we have to manually declare this field 55 target = PolicyBindingModelForeignKey( 56 queryset=PolicyBindingModel.objects.select_subclasses(), 57 required=True, 58 ) 59 60 policy_obj = PolicySerializer(required=False, read_only=True, source="policy") 61 group_obj = PartialGroupSerializer(required=False, read_only=True, source="group") 62 user_obj = PartialUserSerializer(required=False, read_only=True, source="user") 63 64 class Meta: 65 model = PolicyBinding 66 fields = [ 67 "pk", 68 "policy", 69 "group", 70 "user", 71 "policy_obj", 72 "group_obj", 73 "user_obj", 74 "target", 75 "negate", 76 "enabled", 77 "order", 78 "timeout", 79 "failure_result", 80 ] 81 82 def validate(self, attrs: OrderedDict) -> OrderedDict: 83 """Check that either policy, group or user is set.""" 84 target: PolicyBindingModel = attrs.get("target") 85 supported = target.supported_policy_binding_targets() 86 supported.sort() 87 count = sum([bool(attrs.get(x, None)) for x in supported]) 88 invalid = count > 1 89 empty = count < 1 90 warning = ", ".join(f"'{x}'" for x in supported) 91 if invalid: 92 raise ValidationError(f"Only one of {warning} can be set.") 93 if empty: 94 raise ValidationError(f"One of {warning} must be set.") 95 return attrs 96 97 98class PolicyBindingFilter(FilterSet): 99 """Filter for PolicyBindings""" 100 101 target_in = ModelMultipleChoiceFilter( 102 field_name="target__pbm_uuid", 103 to_field_name="pbm_uuid", 104 queryset=PolicyBindingModel.objects.select_subclasses(), 105 ) 106 policy__isnull = BooleanFilter("policy", "isnull") 107 108 class Meta: 109 model = PolicyBinding 110 fields = ["policy", "policy__isnull", "target", "target_in", "enabled", "order", "timeout"] 111 112 113class PolicyBindingViewSet(UsedByMixin, ModelViewSet): 114 """PolicyBinding Viewset""" 115 116 queryset = ( 117 PolicyBinding.objects.all() 118 .select_related("target", "group", "user") 119 .prefetch_related("policy") 120 ) # prefetching policy so we resolve the subclass 121 serializer_class = PolicyBindingSerializer 122 search_fields = ["policy__name"] 123 filterset_class = PolicyBindingFilter 124 ordering = ["order", "pk"] 125 ordering_fields = ["order", "target__uuid", "pk"]
class
PolicyBindingModelForeignKey(rest_framework.relations.PrimaryKeyRelatedField):
21class PolicyBindingModelForeignKey(PrimaryKeyRelatedField): 22 """rest_framework PrimaryKeyRelatedField which resolves 23 model_manager's InheritanceQuerySet""" 24 25 def use_pk_only_optimization(self): 26 return False 27 28 def to_internal_value(self, data): 29 if self.pk_field is not None: 30 data = self.pk_field.to_internal_value(data) 31 try: 32 # Due to inheritance, a direct DB lookup for the primary key 33 # won't return anything. This is because the direct lookup 34 # checks the PK of PolicyBindingModel (for example), 35 # but we get given the Primary Key of the inheriting class 36 for model in self.get_queryset().select_subclasses().all(): 37 if str(model.pk) == str(data): 38 return model 39 # as a fallback we still try a direct lookup 40 return self.get_queryset().get_subclass(pk=data) 41 except ObjectDoesNotExist: 42 self.fail("does_not_exist", pk_value=data) 43 except TypeError, ValueError: 44 self.fail("incorrect_type", data_type=type(data).__name__) 45 46 def to_representation(self, value): 47 correct_model = PolicyBindingModel.objects.get_subclass(pbm_uuid=value.pbm_uuid) 48 return correct_model.pk
rest_framework PrimaryKeyRelatedField which resolves model_manager's InheritanceQuerySet
def
to_internal_value(self, data):
28 def to_internal_value(self, data): 29 if self.pk_field is not None: 30 data = self.pk_field.to_internal_value(data) 31 try: 32 # Due to inheritance, a direct DB lookup for the primary key 33 # won't return anything. This is because the direct lookup 34 # checks the PK of PolicyBindingModel (for example), 35 # but we get given the Primary Key of the inheriting class 36 for model in self.get_queryset().select_subclasses().all(): 37 if str(model.pk) == str(data): 38 return model 39 # as a fallback we still try a direct lookup 40 return self.get_queryset().get_subclass(pk=data) 41 except ObjectDoesNotExist: 42 self.fail("does_not_exist", pk_value=data) 43 except TypeError, ValueError: 44 self.fail("incorrect_type", data_type=type(data).__name__)
Transform the incoming primitive data into a native value.
51class PolicyBindingSerializer(ModelSerializer): 52 """PolicyBinding Serializer""" 53 54 # Because we're not interested in the PolicyBindingModel's PK but rather the subclasses PK, 55 # we have to manually declare this field 56 target = PolicyBindingModelForeignKey( 57 queryset=PolicyBindingModel.objects.select_subclasses(), 58 required=True, 59 ) 60 61 policy_obj = PolicySerializer(required=False, read_only=True, source="policy") 62 group_obj = PartialGroupSerializer(required=False, read_only=True, source="group") 63 user_obj = PartialUserSerializer(required=False, read_only=True, source="user") 64 65 class Meta: 66 model = PolicyBinding 67 fields = [ 68 "pk", 69 "policy", 70 "group", 71 "user", 72 "policy_obj", 73 "group_obj", 74 "user_obj", 75 "target", 76 "negate", 77 "enabled", 78 "order", 79 "timeout", 80 "failure_result", 81 ] 82 83 def validate(self, attrs: OrderedDict) -> OrderedDict: 84 """Check that either policy, group or user is set.""" 85 target: PolicyBindingModel = attrs.get("target") 86 supported = target.supported_policy_binding_targets() 87 supported.sort() 88 count = sum([bool(attrs.get(x, None)) for x in supported]) 89 invalid = count > 1 90 empty = count < 1 91 warning = ", ".join(f"'{x}'" for x in supported) 92 if invalid: 93 raise ValidationError(f"Only one of {warning} can be set.") 94 if empty: 95 raise ValidationError(f"One of {warning} must be set.") 96 return attrs
PolicyBinding Serializer
def
validate(self, attrs: collections.OrderedDict) -> collections.OrderedDict:
83 def validate(self, attrs: OrderedDict) -> OrderedDict: 84 """Check that either policy, group or user is set.""" 85 target: PolicyBindingModel = attrs.get("target") 86 supported = target.supported_policy_binding_targets() 87 supported.sort() 88 count = sum([bool(attrs.get(x, None)) for x in supported]) 89 invalid = count > 1 90 empty = count < 1 91 warning = ", ".join(f"'{x}'" for x in supported) 92 if invalid: 93 raise ValidationError(f"Only one of {warning} can be set.") 94 if empty: 95 raise ValidationError(f"One of {warning} must be set.") 96 return attrs
Check that either policy, group or user is set.
Inherited Members
class
PolicyBindingSerializer.Meta:
65 class Meta: 66 model = PolicyBinding 67 fields = [ 68 "pk", 69 "policy", 70 "group", 71 "user", 72 "policy_obj", 73 "group_obj", 74 "user_obj", 75 "target", 76 "negate", 77 "enabled", 78 "order", 79 "timeout", 80 "failure_result", 81 ]
model =
<class 'authentik.policies.models.PolicyBinding'>
class
PolicyBindingFilter(django_filters.filterset.FilterSet):
99class PolicyBindingFilter(FilterSet): 100 """Filter for PolicyBindings""" 101 102 target_in = ModelMultipleChoiceFilter( 103 field_name="target__pbm_uuid", 104 to_field_name="pbm_uuid", 105 queryset=PolicyBindingModel.objects.select_subclasses(), 106 ) 107 policy__isnull = BooleanFilter("policy", "isnull") 108 109 class Meta: 110 model = PolicyBinding 111 fields = ["policy", "policy__isnull", "target", "target_in", "enabled", "order", "timeout"]
Filter for PolicyBindings
declared_filters =
OrderedDict({'target_in': <django_filters.filters.ModelMultipleChoiceFilter object>, 'policy__isnull': <django_filters.filters.BooleanFilter object>})
base_filters =
OrderedDict({'policy': <django_filters.filters.ModelChoiceFilter object>, 'policy__isnull': <django_filters.filters.BooleanFilter object>, 'target': <django_filters.filters.ModelChoiceFilter object>, 'target_in': <django_filters.filters.ModelMultipleChoiceFilter object>, 'enabled': <django_filters.filters.BooleanFilter object>, 'order': <django_filters.filters.NumberFilter object>, 'timeout': <django_filters.filters.NumberFilter object>})
class
PolicyBindingFilter.Meta:
109 class Meta: 110 model = PolicyBinding 111 fields = ["policy", "policy__isnull", "target", "target_in", "enabled", "order", "timeout"]
model =
<class 'authentik.policies.models.PolicyBinding'>
class
PolicyBindingViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
114class PolicyBindingViewSet(UsedByMixin, ModelViewSet): 115 """PolicyBinding Viewset""" 116 117 queryset = ( 118 PolicyBinding.objects.all() 119 .select_related("target", "group", "user") 120 .prefetch_related("policy") 121 ) # prefetching policy so we resolve the subclass 122 serializer_class = PolicyBindingSerializer 123 search_fields = ["policy__name"] 124 filterset_class = PolicyBindingFilter 125 ordering = ["order", "pk"] 126 ordering_fields = ["order", "target__uuid", "pk"]
PolicyBinding Viewset
serializer_class =
<class 'PolicyBindingSerializer'>
filterset_class =
<class 'PolicyBindingFilter'>