authentik.policies.api.bindings

policy binding API Views

  1"""policy binding API Views"""
  2
  3from collections import OrderedDict
  4
  5from django.core.exceptions import ObjectDoesNotExist
  6from django_filters.filters import BooleanFilter, ModelMultipleChoiceFilter
  7from django_filters.filterset import FilterSet
  8from rest_framework.exceptions import ValidationError
  9from rest_framework.serializers import PrimaryKeyRelatedField
 10from rest_framework.viewsets import ModelViewSet
 11
 12from authentik.core.api.groups import PartialUserSerializer
 13from authentik.core.api.used_by import UsedByMixin
 14from authentik.core.api.users import PartialGroupSerializer
 15from authentik.core.api.utils import ModelSerializer
 16from authentik.policies.api.policies import PolicySerializer
 17from authentik.policies.models import PolicyBinding, PolicyBindingModel
 18
 19
 20class PolicyBindingModelForeignKey(PrimaryKeyRelatedField):
 21    """rest_framework PrimaryKeyRelatedField which resolves
 22    model_manager's InheritanceQuerySet"""
 23
 24    def use_pk_only_optimization(self):
 25        return False
 26
 27    def to_internal_value(self, data):
 28        if self.pk_field is not None:
 29            data = self.pk_field.to_internal_value(data)
 30        try:
 31            # Due to inheritance, a direct DB lookup for the primary key
 32            # won't return anything. This is because the direct lookup
 33            # checks the PK of PolicyBindingModel (for example),
 34            # but we get given the Primary Key of the inheriting class
 35            for model in self.get_queryset().select_subclasses().all():
 36                if str(model.pk) == str(data):
 37                    return model
 38            # as a fallback we still try a direct lookup
 39            return self.get_queryset().get_subclass(pk=data)
 40        except ObjectDoesNotExist:
 41            self.fail("does_not_exist", pk_value=data)
 42        except TypeError, ValueError:
 43            self.fail("incorrect_type", data_type=type(data).__name__)
 44
 45    def to_representation(self, value):
 46        correct_model = PolicyBindingModel.objects.get_subclass(pbm_uuid=value.pbm_uuid)
 47        return correct_model.pk
 48
 49
 50class PolicyBindingSerializer(ModelSerializer):
 51    """PolicyBinding Serializer"""
 52
 53    # Because we're not interested in the PolicyBindingModel's PK but rather the subclasses PK,
 54    # we have to manually declare this field
 55    target = PolicyBindingModelForeignKey(
 56        queryset=PolicyBindingModel.objects.select_subclasses(),
 57        required=True,
 58    )
 59
 60    policy_obj = PolicySerializer(required=False, read_only=True, source="policy")
 61    group_obj = PartialGroupSerializer(required=False, read_only=True, source="group")
 62    user_obj = PartialUserSerializer(required=False, read_only=True, source="user")
 63
 64    class Meta:
 65        model = PolicyBinding
 66        fields = [
 67            "pk",
 68            "policy",
 69            "group",
 70            "user",
 71            "policy_obj",
 72            "group_obj",
 73            "user_obj",
 74            "target",
 75            "negate",
 76            "enabled",
 77            "order",
 78            "timeout",
 79            "failure_result",
 80        ]
 81
 82    def validate(self, attrs: OrderedDict) -> OrderedDict:
 83        """Check that either policy, group or user is set."""
 84        target: PolicyBindingModel = attrs.get("target")
 85        supported = target.supported_policy_binding_targets()
 86        supported.sort()
 87        count = sum([bool(attrs.get(x, None)) for x in supported])
 88        invalid = count > 1
 89        empty = count < 1
 90        warning = ", ".join(f"'{x}'" for x in supported)
 91        if invalid:
 92            raise ValidationError(f"Only one of {warning} can be set.")
 93        if empty:
 94            raise ValidationError(f"One of {warning} must be set.")
 95        return attrs
 96
 97
 98class PolicyBindingFilter(FilterSet):
 99    """Filter for PolicyBindings"""
100
101    target_in = ModelMultipleChoiceFilter(
102        field_name="target__pbm_uuid",
103        to_field_name="pbm_uuid",
104        queryset=PolicyBindingModel.objects.select_subclasses(),
105    )
106    policy__isnull = BooleanFilter("policy", "isnull")
107
108    class Meta:
109        model = PolicyBinding
110        fields = ["policy", "policy__isnull", "target", "target_in", "enabled", "order", "timeout"]
111
112
113class PolicyBindingViewSet(UsedByMixin, ModelViewSet):
114    """PolicyBinding Viewset"""
115
116    queryset = (
117        PolicyBinding.objects.all()
118        .select_related("target", "group", "user")
119        .prefetch_related("policy")
120    )  # prefetching policy so we resolve the subclass
121    serializer_class = PolicyBindingSerializer
122    search_fields = ["policy__name"]
123    filterset_class = PolicyBindingFilter
124    ordering = ["order", "pk"]
125    ordering_fields = ["order", "target__uuid", "pk"]
class PolicyBindingModelForeignKey(rest_framework.relations.PrimaryKeyRelatedField):
21class PolicyBindingModelForeignKey(PrimaryKeyRelatedField):
22    """rest_framework PrimaryKeyRelatedField which resolves
23    model_manager's InheritanceQuerySet"""
24
25    def use_pk_only_optimization(self):
26        return False
27
28    def to_internal_value(self, data):
29        if self.pk_field is not None:
30            data = self.pk_field.to_internal_value(data)
31        try:
32            # Due to inheritance, a direct DB lookup for the primary key
33            # won't return anything. This is because the direct lookup
34            # checks the PK of PolicyBindingModel (for example),
35            # but we get given the Primary Key of the inheriting class
36            for model in self.get_queryset().select_subclasses().all():
37                if str(model.pk) == str(data):
38                    return model
39            # as a fallback we still try a direct lookup
40            return self.get_queryset().get_subclass(pk=data)
41        except ObjectDoesNotExist:
42            self.fail("does_not_exist", pk_value=data)
43        except TypeError, ValueError:
44            self.fail("incorrect_type", data_type=type(data).__name__)
45
46    def to_representation(self, value):
47        correct_model = PolicyBindingModel.objects.get_subclass(pbm_uuid=value.pbm_uuid)
48        return correct_model.pk

rest_framework PrimaryKeyRelatedField which resolves model_manager's InheritanceQuerySet

def use_pk_only_optimization(self):
25    def use_pk_only_optimization(self):
26        return False
def to_internal_value(self, data):
28    def to_internal_value(self, data):
29        if self.pk_field is not None:
30            data = self.pk_field.to_internal_value(data)
31        try:
32            # Due to inheritance, a direct DB lookup for the primary key
33            # won't return anything. This is because the direct lookup
34            # checks the PK of PolicyBindingModel (for example),
35            # but we get given the Primary Key of the inheriting class
36            for model in self.get_queryset().select_subclasses().all():
37                if str(model.pk) == str(data):
38                    return model
39            # as a fallback we still try a direct lookup
40            return self.get_queryset().get_subclass(pk=data)
41        except ObjectDoesNotExist:
42            self.fail("does_not_exist", pk_value=data)
43        except TypeError, ValueError:
44            self.fail("incorrect_type", data_type=type(data).__name__)

Transform the incoming primitive data into a native value.

def to_representation(self, value):
46    def to_representation(self, value):
47        correct_model = PolicyBindingModel.objects.get_subclass(pbm_uuid=value.pbm_uuid)
48        return correct_model.pk

Transform the outgoing native value into primitive data.

class PolicyBindingSerializer(authentik.core.api.utils.ModelSerializer):
51class PolicyBindingSerializer(ModelSerializer):
52    """PolicyBinding Serializer"""
53
54    # Because we're not interested in the PolicyBindingModel's PK but rather the subclasses PK,
55    # we have to manually declare this field
56    target = PolicyBindingModelForeignKey(
57        queryset=PolicyBindingModel.objects.select_subclasses(),
58        required=True,
59    )
60
61    policy_obj = PolicySerializer(required=False, read_only=True, source="policy")
62    group_obj = PartialGroupSerializer(required=False, read_only=True, source="group")
63    user_obj = PartialUserSerializer(required=False, read_only=True, source="user")
64
65    class Meta:
66        model = PolicyBinding
67        fields = [
68            "pk",
69            "policy",
70            "group",
71            "user",
72            "policy_obj",
73            "group_obj",
74            "user_obj",
75            "target",
76            "negate",
77            "enabled",
78            "order",
79            "timeout",
80            "failure_result",
81        ]
82
83    def validate(self, attrs: OrderedDict) -> OrderedDict:
84        """Check that either policy, group or user is set."""
85        target: PolicyBindingModel = attrs.get("target")
86        supported = target.supported_policy_binding_targets()
87        supported.sort()
88        count = sum([bool(attrs.get(x, None)) for x in supported])
89        invalid = count > 1
90        empty = count < 1
91        warning = ", ".join(f"'{x}'" for x in supported)
92        if invalid:
93            raise ValidationError(f"Only one of {warning} can be set.")
94        if empty:
95            raise ValidationError(f"One of {warning} must be set.")
96        return attrs

PolicyBinding Serializer

target
policy_obj
group_obj
user_obj
def validate(self, attrs: collections.OrderedDict) -> collections.OrderedDict:
83    def validate(self, attrs: OrderedDict) -> OrderedDict:
84        """Check that either policy, group or user is set."""
85        target: PolicyBindingModel = attrs.get("target")
86        supported = target.supported_policy_binding_targets()
87        supported.sort()
88        count = sum([bool(attrs.get(x, None)) for x in supported])
89        invalid = count > 1
90        empty = count < 1
91        warning = ", ".join(f"'{x}'" for x in supported)
92        if invalid:
93            raise ValidationError(f"Only one of {warning} can be set.")
94        if empty:
95            raise ValidationError(f"One of {warning} must be set.")
96        return attrs

Check that either policy, group or user is set.

class PolicyBindingSerializer.Meta:
65    class Meta:
66        model = PolicyBinding
67        fields = [
68            "pk",
69            "policy",
70            "group",
71            "user",
72            "policy_obj",
73            "group_obj",
74            "user_obj",
75            "target",
76            "negate",
77            "enabled",
78            "order",
79            "timeout",
80            "failure_result",
81        ]
fields = ['pk', 'policy', 'group', 'user', 'policy_obj', 'group_obj', 'user_obj', 'target', 'negate', 'enabled', 'order', 'timeout', 'failure_result']
class PolicyBindingFilter(django_filters.filterset.FilterSet):
 99class PolicyBindingFilter(FilterSet):
100    """Filter for PolicyBindings"""
101
102    target_in = ModelMultipleChoiceFilter(
103        field_name="target__pbm_uuid",
104        to_field_name="pbm_uuid",
105        queryset=PolicyBindingModel.objects.select_subclasses(),
106    )
107    policy__isnull = BooleanFilter("policy", "isnull")
108
109    class Meta:
110        model = PolicyBinding
111        fields = ["policy", "policy__isnull", "target", "target_in", "enabled", "order", "timeout"]

Filter for PolicyBindings

target_in
policy__isnull
declared_filters = OrderedDict({'target_in': <django_filters.filters.ModelMultipleChoiceFilter object>, 'policy__isnull': <django_filters.filters.BooleanFilter object>})
base_filters = OrderedDict({'policy': <django_filters.filters.ModelChoiceFilter object>, 'policy__isnull': <django_filters.filters.BooleanFilter object>, 'target': <django_filters.filters.ModelChoiceFilter object>, 'target_in': <django_filters.filters.ModelMultipleChoiceFilter object>, 'enabled': <django_filters.filters.BooleanFilter object>, 'order': <django_filters.filters.NumberFilter object>, 'timeout': <django_filters.filters.NumberFilter object>})
class PolicyBindingFilter.Meta:
109    class Meta:
110        model = PolicyBinding
111        fields = ["policy", "policy__isnull", "target", "target_in", "enabled", "order", "timeout"]
fields = ['policy', 'policy__isnull', 'target', 'target_in', 'enabled', 'order', 'timeout']
class PolicyBindingViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
114class PolicyBindingViewSet(UsedByMixin, ModelViewSet):
115    """PolicyBinding Viewset"""
116
117    queryset = (
118        PolicyBinding.objects.all()
119        .select_related("target", "group", "user")
120        .prefetch_related("policy")
121    )  # prefetching policy so we resolve the subclass
122    serializer_class = PolicyBindingSerializer
123    search_fields = ["policy__name"]
124    filterset_class = PolicyBindingFilter
125    ordering = ["order", "pk"]
126    ordering_fields = ["order", "target__uuid", "pk"]

PolicyBinding Viewset

queryset = <QuerySet []>
serializer_class = <class 'PolicyBindingSerializer'>
search_fields = ['policy__name']
filterset_class = <class 'PolicyBindingFilter'>
ordering = ['order', 'pk']
ordering_fields = ['order', 'target__uuid', 'pk']
name = None
description = None
suffix = None
detail = None
basename = None