authentik.enterprise.lifecycle.api.rules

  1from django.utils.translation import gettext as _
  2from rest_framework.exceptions import ValidationError
  3from rest_framework.fields import SerializerMethodField
  4from rest_framework.relations import SlugRelatedField
  5from rest_framework.viewsets import ModelViewSet
  6
  7from authentik.core.api.utils import ModelSerializer
  8from authentik.core.models import User
  9from authentik.enterprise.api import EnterpriseRequiredMixin
 10from authentik.enterprise.lifecycle.models import LifecycleRule
 11from authentik.enterprise.lifecycle.utils import (
 12    ContentTypeField,
 13    ReviewerGroupSerializer,
 14    ReviewerUserSerializer,
 15)
 16from authentik.lib.utils.time import timedelta_from_string
 17
 18
 19class LifecycleRuleSerializer(EnterpriseRequiredMixin, ModelSerializer):
 20    content_type = ContentTypeField()
 21    target_verbose = SerializerMethodField()
 22    reviewer_groups_obj = ReviewerGroupSerializer(
 23        many=True, read_only=True, source="reviewer_groups"
 24    )
 25    reviewers = SlugRelatedField(slug_field="uuid", many=True, queryset=User.objects.all())
 26    reviewers_obj = ReviewerUserSerializer(many=True, read_only=True, source="reviewers")
 27
 28    class Meta:
 29        model = LifecycleRule
 30        fields = [
 31            "id",
 32            "name",
 33            "content_type",
 34            "object_id",
 35            "interval",
 36            "grace_period",
 37            "reviewer_groups",
 38            "reviewer_groups_obj",
 39            "min_reviewers",
 40            "min_reviewers_is_per_group",
 41            "reviewers",
 42            "reviewers_obj",
 43            "notification_transports",
 44            "target_verbose",
 45        ]
 46        read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"]
 47
 48    def get_target_verbose(self, rule: LifecycleRule) -> str:
 49        if rule.object_id is None:
 50            return rule.content_type.model_class()._meta.verbose_name_plural
 51        else:
 52            return f"{rule.content_type.model_class()._meta.verbose_name}: {rule.object}"
 53
 54    def validate_object_id(self, value: str) -> str | None:
 55        if value == "":
 56            return None
 57        return value
 58
 59    def validate(self, attrs: dict) -> dict:
 60        if (
 61            attrs.get("object_id") is not None
 62            and not attrs["content_type"]
 63            .get_all_objects_for_this_type(pk=attrs["object_id"])
 64            .exists()
 65        ):
 66            raise ValidationError({"object_id": _("Object does not exist")})
 67        if "reviewer_groups" in attrs or "reviewers" in attrs:
 68            reviewer_groups = attrs.get(
 69                "reviewer_groups", self.instance.reviewer_groups.all() if self.instance else []
 70            )
 71            reviewers = attrs.get(
 72                "reviewers", self.instance.reviewers.all() if self.instance else []
 73            )
 74            if len(reviewer_groups) == 0 and len(reviewers) == 0:
 75                raise ValidationError(_("Either a reviewer group or a reviewer must be set."))
 76        if "grace_period" in attrs or "interval" in attrs:
 77            grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None))
 78            interval = attrs.get("interval", getattr(self.instance, "interval", None))
 79            if (
 80                grace_period is not None
 81                and interval is not None
 82                and (timedelta_from_string(grace_period) > timedelta_from_string(interval))
 83            ):
 84                raise ValidationError(
 85                    {"grace_period": _("Grace period must be shorter than the interval.")}
 86                )
 87        if "content_type" in attrs or "object_id" in attrs:
 88            content_type = attrs.get("content_type", getattr(self.instance, "content_type", None))
 89            object_id = attrs.get("object_id", getattr(self.instance, "object_id", None))
 90            if content_type is not None and object_id is None:
 91                existing = LifecycleRule.objects.filter(
 92                    content_type=content_type, object_id__isnull=True
 93                )
 94                if self.instance:
 95                    existing = existing.exclude(pk=self.instance.pk)
 96                if existing.exists():
 97                    raise ValidationError(
 98                        {
 99                            "content_type": _(
100                                "Only one type-wide rule for each object type is allowed."
101                            )
102                        }
103                    )
104        return attrs
105
106
107class LifecycleRuleViewSet(ModelViewSet):
108    queryset = LifecycleRule.objects.all()
109    serializer_class = LifecycleRuleSerializer
110    search_fields = ["content_type__model", "reviewer_groups__name", "reviewers__username"]
111    ordering = ["name"]
112    ordering_fields = ["name", "content_type__model"]
113    filterset_fields = ["content_type__model"]
 20class LifecycleRuleSerializer(EnterpriseRequiredMixin, ModelSerializer):
 21    content_type = ContentTypeField()
 22    target_verbose = SerializerMethodField()
 23    reviewer_groups_obj = ReviewerGroupSerializer(
 24        many=True, read_only=True, source="reviewer_groups"
 25    )
 26    reviewers = SlugRelatedField(slug_field="uuid", many=True, queryset=User.objects.all())
 27    reviewers_obj = ReviewerUserSerializer(many=True, read_only=True, source="reviewers")
 28
 29    class Meta:
 30        model = LifecycleRule
 31        fields = [
 32            "id",
 33            "name",
 34            "content_type",
 35            "object_id",
 36            "interval",
 37            "grace_period",
 38            "reviewer_groups",
 39            "reviewer_groups_obj",
 40            "min_reviewers",
 41            "min_reviewers_is_per_group",
 42            "reviewers",
 43            "reviewers_obj",
 44            "notification_transports",
 45            "target_verbose",
 46        ]
 47        read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"]
 48
 49    def get_target_verbose(self, rule: LifecycleRule) -> str:
 50        if rule.object_id is None:
 51            return rule.content_type.model_class()._meta.verbose_name_plural
 52        else:
 53            return f"{rule.content_type.model_class()._meta.verbose_name}: {rule.object}"
 54
 55    def validate_object_id(self, value: str) -> str | None:
 56        if value == "":
 57            return None
 58        return value
 59
 60    def validate(self, attrs: dict) -> dict:
 61        if (
 62            attrs.get("object_id") is not None
 63            and not attrs["content_type"]
 64            .get_all_objects_for_this_type(pk=attrs["object_id"])
 65            .exists()
 66        ):
 67            raise ValidationError({"object_id": _("Object does not exist")})
 68        if "reviewer_groups" in attrs or "reviewers" in attrs:
 69            reviewer_groups = attrs.get(
 70                "reviewer_groups", self.instance.reviewer_groups.all() if self.instance else []
 71            )
 72            reviewers = attrs.get(
 73                "reviewers", self.instance.reviewers.all() if self.instance else []
 74            )
 75            if len(reviewer_groups) == 0 and len(reviewers) == 0:
 76                raise ValidationError(_("Either a reviewer group or a reviewer must be set."))
 77        if "grace_period" in attrs or "interval" in attrs:
 78            grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None))
 79            interval = attrs.get("interval", getattr(self.instance, "interval", None))
 80            if (
 81                grace_period is not None
 82                and interval is not None
 83                and (timedelta_from_string(grace_period) > timedelta_from_string(interval))
 84            ):
 85                raise ValidationError(
 86                    {"grace_period": _("Grace period must be shorter than the interval.")}
 87                )
 88        if "content_type" in attrs or "object_id" in attrs:
 89            content_type = attrs.get("content_type", getattr(self.instance, "content_type", None))
 90            object_id = attrs.get("object_id", getattr(self.instance, "object_id", None))
 91            if content_type is not None and object_id is None:
 92                existing = LifecycleRule.objects.filter(
 93                    content_type=content_type, object_id__isnull=True
 94                )
 95                if self.instance:
 96                    existing = existing.exclude(pk=self.instance.pk)
 97                if existing.exists():
 98                    raise ValidationError(
 99                        {
100                            "content_type": _(
101                                "Only one type-wide rule for each object type is allowed."
102                            )
103                        }
104                    )
105        return attrs

Mixin to validate that a valid enterprise license exists before allowing to save the object

content_type
target_verbose
reviewer_groups_obj
reviewers
reviewers_obj
def get_target_verbose(self, rule: authentik.enterprise.lifecycle.models.LifecycleRule) -> str:
49    def get_target_verbose(self, rule: LifecycleRule) -> str:
50        if rule.object_id is None:
51            return rule.content_type.model_class()._meta.verbose_name_plural
52        else:
53            return f"{rule.content_type.model_class()._meta.verbose_name}: {rule.object}"
def validate_object_id(self, value: str) -> str | None:
55    def validate_object_id(self, value: str) -> str | None:
56        if value == "":
57            return None
58        return value
def validate(self, attrs: dict) -> dict:
 60    def validate(self, attrs: dict) -> dict:
 61        if (
 62            attrs.get("object_id") is not None
 63            and not attrs["content_type"]
 64            .get_all_objects_for_this_type(pk=attrs["object_id"])
 65            .exists()
 66        ):
 67            raise ValidationError({"object_id": _("Object does not exist")})
 68        if "reviewer_groups" in attrs or "reviewers" in attrs:
 69            reviewer_groups = attrs.get(
 70                "reviewer_groups", self.instance.reviewer_groups.all() if self.instance else []
 71            )
 72            reviewers = attrs.get(
 73                "reviewers", self.instance.reviewers.all() if self.instance else []
 74            )
 75            if len(reviewer_groups) == 0 and len(reviewers) == 0:
 76                raise ValidationError(_("Either a reviewer group or a reviewer must be set."))
 77        if "grace_period" in attrs or "interval" in attrs:
 78            grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None))
 79            interval = attrs.get("interval", getattr(self.instance, "interval", None))
 80            if (
 81                grace_period is not None
 82                and interval is not None
 83                and (timedelta_from_string(grace_period) > timedelta_from_string(interval))
 84            ):
 85                raise ValidationError(
 86                    {"grace_period": _("Grace period must be shorter than the interval.")}
 87                )
 88        if "content_type" in attrs or "object_id" in attrs:
 89            content_type = attrs.get("content_type", getattr(self.instance, "content_type", None))
 90            object_id = attrs.get("object_id", getattr(self.instance, "object_id", None))
 91            if content_type is not None and object_id is None:
 92                existing = LifecycleRule.objects.filter(
 93                    content_type=content_type, object_id__isnull=True
 94                )
 95                if self.instance:
 96                    existing = existing.exclude(pk=self.instance.pk)
 97                if existing.exists():
 98                    raise ValidationError(
 99                        {
100                            "content_type": _(
101                                "Only one type-wide rule for each object type is allowed."
102                            )
103                        }
104                    )
105        return attrs

Check that a valid license exists

class LifecycleRuleSerializer.Meta:
29    class Meta:
30        model = LifecycleRule
31        fields = [
32            "id",
33            "name",
34            "content_type",
35            "object_id",
36            "interval",
37            "grace_period",
38            "reviewer_groups",
39            "reviewer_groups_obj",
40            "min_reviewers",
41            "min_reviewers_is_per_group",
42            "reviewers",
43            "reviewers_obj",
44            "notification_transports",
45            "target_verbose",
46        ]
47        read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"]
fields = ['id', 'name', 'content_type', 'object_id', 'interval', 'grace_period', 'reviewer_groups', 'reviewer_groups_obj', 'min_reviewers', 'min_reviewers_is_per_group', 'reviewers', 'reviewers_obj', 'notification_transports', 'target_verbose']
read_only_fields = ['id', 'reviewers_obj', 'reviewer_groups_obj', 'target_verbose']
class LifecycleRuleViewSet(rest_framework.viewsets.ModelViewSet):
108class LifecycleRuleViewSet(ModelViewSet):
109    queryset = LifecycleRule.objects.all()
110    serializer_class = LifecycleRuleSerializer
111    search_fields = ["content_type__model", "reviewer_groups__name", "reviewers__username"]
112    ordering = ["name"]
113    ordering_fields = ["name", "content_type__model"]
114    filterset_fields = ["content_type__model"]

A viewset that provides default create(), retrieve(), update(), partial_update(), destroy() and list() actions.

queryset = <QuerySet []>
serializer_class = <class 'LifecycleRuleSerializer'>
search_fields = ['content_type__model', 'reviewer_groups__name', 'reviewers__username']
ordering = ['name']
ordering_fields = ['name', 'content_type__model']
filterset_fields = ['content_type__model']
name = None
description = None
suffix = None
detail = None
basename = None