authentik.enterprise.lifecycle.api.rules
1from django.utils.translation import gettext as _ 2from rest_framework.exceptions import ValidationError 3from rest_framework.fields import SerializerMethodField 4from rest_framework.relations import SlugRelatedField 5from rest_framework.viewsets import ModelViewSet 6 7from authentik.core.api.utils import ModelSerializer 8from authentik.core.models import User 9from authentik.enterprise.api import EnterpriseRequiredMixin 10from authentik.enterprise.lifecycle.models import LifecycleRule 11from authentik.enterprise.lifecycle.utils import ( 12 ContentTypeField, 13 ReviewerGroupSerializer, 14 ReviewerUserSerializer, 15) 16from authentik.lib.utils.time import timedelta_from_string 17 18 19class LifecycleRuleSerializer(EnterpriseRequiredMixin, ModelSerializer): 20 content_type = ContentTypeField() 21 target_verbose = SerializerMethodField() 22 reviewer_groups_obj = ReviewerGroupSerializer( 23 many=True, read_only=True, source="reviewer_groups" 24 ) 25 reviewers = SlugRelatedField(slug_field="uuid", many=True, queryset=User.objects.all()) 26 reviewers_obj = ReviewerUserSerializer(many=True, read_only=True, source="reviewers") 27 28 class Meta: 29 model = LifecycleRule 30 fields = [ 31 "id", 32 "name", 33 "content_type", 34 "object_id", 35 "interval", 36 "grace_period", 37 "reviewer_groups", 38 "reviewer_groups_obj", 39 "min_reviewers", 40 "min_reviewers_is_per_group", 41 "reviewers", 42 "reviewers_obj", 43 "notification_transports", 44 "target_verbose", 45 ] 46 read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"] 47 48 def get_target_verbose(self, rule: LifecycleRule) -> str: 49 if rule.object_id is None: 50 return rule.content_type.model_class()._meta.verbose_name_plural 51 else: 52 return f"{rule.content_type.model_class()._meta.verbose_name}: {rule.object}" 53 54 def validate_object_id(self, value: str) -> str | None: 55 if value == "": 56 return None 57 return value 58 59 def validate(self, attrs: dict) -> dict: 60 if ( 61 attrs.get("object_id") is not None 62 and not attrs["content_type"] 63 .get_all_objects_for_this_type(pk=attrs["object_id"]) 64 .exists() 65 ): 66 raise ValidationError({"object_id": _("Object does not exist")}) 67 if "reviewer_groups" in attrs or "reviewers" in attrs: 68 reviewer_groups = attrs.get( 69 "reviewer_groups", self.instance.reviewer_groups.all() if self.instance else [] 70 ) 71 reviewers = attrs.get( 72 "reviewers", self.instance.reviewers.all() if self.instance else [] 73 ) 74 if len(reviewer_groups) == 0 and len(reviewers) == 0: 75 raise ValidationError(_("Either a reviewer group or a reviewer must be set.")) 76 if "grace_period" in attrs or "interval" in attrs: 77 grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None)) 78 interval = attrs.get("interval", getattr(self.instance, "interval", None)) 79 if ( 80 grace_period is not None 81 and interval is not None 82 and (timedelta_from_string(grace_period) > timedelta_from_string(interval)) 83 ): 84 raise ValidationError( 85 {"grace_period": _("Grace period must be shorter than the interval.")} 86 ) 87 if "content_type" in attrs or "object_id" in attrs: 88 content_type = attrs.get("content_type", getattr(self.instance, "content_type", None)) 89 object_id = attrs.get("object_id", getattr(self.instance, "object_id", None)) 90 if content_type is not None and object_id is None: 91 existing = LifecycleRule.objects.filter( 92 content_type=content_type, object_id__isnull=True 93 ) 94 if self.instance: 95 existing = existing.exclude(pk=self.instance.pk) 96 if existing.exists(): 97 raise ValidationError( 98 { 99 "content_type": _( 100 "Only one type-wide rule for each object type is allowed." 101 ) 102 } 103 ) 104 return attrs 105 106 107class LifecycleRuleViewSet(ModelViewSet): 108 queryset = LifecycleRule.objects.all() 109 serializer_class = LifecycleRuleSerializer 110 search_fields = ["content_type__model", "reviewer_groups__name", "reviewers__username"] 111 ordering = ["name"] 112 ordering_fields = ["name", "content_type__model"] 113 filterset_fields = ["content_type__model"]
class
LifecycleRuleSerializer(authentik.enterprise.api.EnterpriseRequiredMixin, authentik.core.api.utils.ModelSerializer):
20class LifecycleRuleSerializer(EnterpriseRequiredMixin, ModelSerializer): 21 content_type = ContentTypeField() 22 target_verbose = SerializerMethodField() 23 reviewer_groups_obj = ReviewerGroupSerializer( 24 many=True, read_only=True, source="reviewer_groups" 25 ) 26 reviewers = SlugRelatedField(slug_field="uuid", many=True, queryset=User.objects.all()) 27 reviewers_obj = ReviewerUserSerializer(many=True, read_only=True, source="reviewers") 28 29 class Meta: 30 model = LifecycleRule 31 fields = [ 32 "id", 33 "name", 34 "content_type", 35 "object_id", 36 "interval", 37 "grace_period", 38 "reviewer_groups", 39 "reviewer_groups_obj", 40 "min_reviewers", 41 "min_reviewers_is_per_group", 42 "reviewers", 43 "reviewers_obj", 44 "notification_transports", 45 "target_verbose", 46 ] 47 read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"] 48 49 def get_target_verbose(self, rule: LifecycleRule) -> str: 50 if rule.object_id is None: 51 return rule.content_type.model_class()._meta.verbose_name_plural 52 else: 53 return f"{rule.content_type.model_class()._meta.verbose_name}: {rule.object}" 54 55 def validate_object_id(self, value: str) -> str | None: 56 if value == "": 57 return None 58 return value 59 60 def validate(self, attrs: dict) -> dict: 61 if ( 62 attrs.get("object_id") is not None 63 and not attrs["content_type"] 64 .get_all_objects_for_this_type(pk=attrs["object_id"]) 65 .exists() 66 ): 67 raise ValidationError({"object_id": _("Object does not exist")}) 68 if "reviewer_groups" in attrs or "reviewers" in attrs: 69 reviewer_groups = attrs.get( 70 "reviewer_groups", self.instance.reviewer_groups.all() if self.instance else [] 71 ) 72 reviewers = attrs.get( 73 "reviewers", self.instance.reviewers.all() if self.instance else [] 74 ) 75 if len(reviewer_groups) == 0 and len(reviewers) == 0: 76 raise ValidationError(_("Either a reviewer group or a reviewer must be set.")) 77 if "grace_period" in attrs or "interval" in attrs: 78 grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None)) 79 interval = attrs.get("interval", getattr(self.instance, "interval", None)) 80 if ( 81 grace_period is not None 82 and interval is not None 83 and (timedelta_from_string(grace_period) > timedelta_from_string(interval)) 84 ): 85 raise ValidationError( 86 {"grace_period": _("Grace period must be shorter than the interval.")} 87 ) 88 if "content_type" in attrs or "object_id" in attrs: 89 content_type = attrs.get("content_type", getattr(self.instance, "content_type", None)) 90 object_id = attrs.get("object_id", getattr(self.instance, "object_id", None)) 91 if content_type is not None and object_id is None: 92 existing = LifecycleRule.objects.filter( 93 content_type=content_type, object_id__isnull=True 94 ) 95 if self.instance: 96 existing = existing.exclude(pk=self.instance.pk) 97 if existing.exists(): 98 raise ValidationError( 99 { 100 "content_type": _( 101 "Only one type-wide rule for each object type is allowed." 102 ) 103 } 104 ) 105 return attrs
Mixin to validate that a valid enterprise license exists before allowing to save the object
def
validate(self, attrs: dict) -> dict:
60 def validate(self, attrs: dict) -> dict: 61 if ( 62 attrs.get("object_id") is not None 63 and not attrs["content_type"] 64 .get_all_objects_for_this_type(pk=attrs["object_id"]) 65 .exists() 66 ): 67 raise ValidationError({"object_id": _("Object does not exist")}) 68 if "reviewer_groups" in attrs or "reviewers" in attrs: 69 reviewer_groups = attrs.get( 70 "reviewer_groups", self.instance.reviewer_groups.all() if self.instance else [] 71 ) 72 reviewers = attrs.get( 73 "reviewers", self.instance.reviewers.all() if self.instance else [] 74 ) 75 if len(reviewer_groups) == 0 and len(reviewers) == 0: 76 raise ValidationError(_("Either a reviewer group or a reviewer must be set.")) 77 if "grace_period" in attrs or "interval" in attrs: 78 grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None)) 79 interval = attrs.get("interval", getattr(self.instance, "interval", None)) 80 if ( 81 grace_period is not None 82 and interval is not None 83 and (timedelta_from_string(grace_period) > timedelta_from_string(interval)) 84 ): 85 raise ValidationError( 86 {"grace_period": _("Grace period must be shorter than the interval.")} 87 ) 88 if "content_type" in attrs or "object_id" in attrs: 89 content_type = attrs.get("content_type", getattr(self.instance, "content_type", None)) 90 object_id = attrs.get("object_id", getattr(self.instance, "object_id", None)) 91 if content_type is not None and object_id is None: 92 existing = LifecycleRule.objects.filter( 93 content_type=content_type, object_id__isnull=True 94 ) 95 if self.instance: 96 existing = existing.exclude(pk=self.instance.pk) 97 if existing.exists(): 98 raise ValidationError( 99 { 100 "content_type": _( 101 "Only one type-wide rule for each object type is allowed." 102 ) 103 } 104 ) 105 return attrs
Check that a valid license exists
Inherited Members
class
LifecycleRuleSerializer.Meta:
29 class Meta: 30 model = LifecycleRule 31 fields = [ 32 "id", 33 "name", 34 "content_type", 35 "object_id", 36 "interval", 37 "grace_period", 38 "reviewer_groups", 39 "reviewer_groups_obj", 40 "min_reviewers", 41 "min_reviewers_is_per_group", 42 "reviewers", 43 "reviewers_obj", 44 "notification_transports", 45 "target_verbose", 46 ] 47 read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"]
model =
<class 'authentik.enterprise.lifecycle.models.LifecycleRule'>
class
LifecycleRuleViewSet(rest_framework.viewsets.ModelViewSet):
108class LifecycleRuleViewSet(ModelViewSet): 109 queryset = LifecycleRule.objects.all() 110 serializer_class = LifecycleRuleSerializer 111 search_fields = ["content_type__model", "reviewer_groups__name", "reviewers__username"] 112 ordering = ["name"] 113 ordering_fields = ["name", "content_type__model"] 114 filterset_fields = ["content_type__model"]
A viewset that provides default create(), retrieve(), update(),
partial_update(), destroy() and list() actions.
serializer_class =
<class 'LifecycleRuleSerializer'>