authentik.enterprise.lifecycle.api.rules

 1from django.utils.translation import gettext as _
 2from rest_framework.exceptions import ValidationError
 3from rest_framework.fields import SerializerMethodField
 4from rest_framework.relations import SlugRelatedField
 5from rest_framework.viewsets import ModelViewSet
 6
 7from authentik.core.api.utils import ModelSerializer
 8from authentik.core.models import User
 9from authentik.enterprise.api import EnterpriseRequiredMixin
10from authentik.enterprise.lifecycle.models import LifecycleRule
11from authentik.enterprise.lifecycle.utils import (
12    ContentTypeField,
13    ReviewerGroupSerializer,
14    ReviewerUserSerializer,
15)
16from authentik.lib.utils.time import timedelta_from_string
17
18
19class LifecycleRuleSerializer(EnterpriseRequiredMixin, ModelSerializer):
20    content_type = ContentTypeField()
21    target_verbose = SerializerMethodField()
22    reviewer_groups_obj = ReviewerGroupSerializer(
23        many=True, read_only=True, source="reviewer_groups"
24    )
25    reviewers = SlugRelatedField(slug_field="uuid", many=True, queryset=User.objects.all())
26    reviewers_obj = ReviewerUserSerializer(many=True, read_only=True, source="reviewers")
27
28    class Meta:
29        model = LifecycleRule
30        fields = [
31            "id",
32            "name",
33            "content_type",
34            "object_id",
35            "interval",
36            "grace_period",
37            "reviewer_groups",
38            "reviewer_groups_obj",
39            "min_reviewers",
40            "min_reviewers_is_per_group",
41            "reviewers",
42            "reviewers_obj",
43            "notification_transports",
44            "target_verbose",
45        ]
46        read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"]
47
48    def get_target_verbose(self, rule: LifecycleRule) -> str:
49        if rule.object_id is None:
50            return rule.content_type.model_class()._meta.verbose_name_plural
51        else:
52            return f"{rule.content_type.model_class()._meta.verbose_name}: {rule.object}"
53
54    def validate_object_id(self, value: str) -> str | None:
55        if value == "":
56            return None
57        return value
58
59    def validate(self, attrs: dict) -> dict:
60        if (
61            attrs.get("object_id") is not None
62            and not attrs["content_type"]
63            .get_all_objects_for_this_type(pk=attrs["object_id"])
64            .exists()
65        ):
66            raise ValidationError({"object_id": _("Object does not exist")})
67        if "reviewer_groups" in attrs or "reviewers" in attrs:
68            reviewer_groups = attrs.get(
69                "reviewer_groups", self.instance.reviewer_groups.all() if self.instance else []
70            )
71            reviewers = attrs.get(
72                "reviewers", self.instance.reviewers.all() if self.instance else []
73            )
74            if len(reviewer_groups) == 0 and len(reviewers) == 0:
75                raise ValidationError(_("Either a reviewer group or a reviewer must be set."))
76        if "grace_period" in attrs or "interval" in attrs:
77            grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None))
78            interval = attrs.get("interval", getattr(self.instance, "interval", None))
79            if (
80                grace_period is not None
81                and interval is not None
82                and (timedelta_from_string(grace_period) > timedelta_from_string(interval))
83            ):
84                raise ValidationError(
85                    {"grace_period": _("Grace period must be shorter than the interval.")}
86                )
87        return attrs
88
89
90class LifecycleRuleViewSet(ModelViewSet):
91    queryset = LifecycleRule.objects.all()
92    serializer_class = LifecycleRuleSerializer
93    search_fields = ["content_type__model", "reviewer_groups__name", "reviewers__username"]
94    ordering = ["name"]
95    ordering_fields = ["name", "content_type__model"]
96    filterset_fields = ["content_type__model"]
20class LifecycleRuleSerializer(EnterpriseRequiredMixin, ModelSerializer):
21    content_type = ContentTypeField()
22    target_verbose = SerializerMethodField()
23    reviewer_groups_obj = ReviewerGroupSerializer(
24        many=True, read_only=True, source="reviewer_groups"
25    )
26    reviewers = SlugRelatedField(slug_field="uuid", many=True, queryset=User.objects.all())
27    reviewers_obj = ReviewerUserSerializer(many=True, read_only=True, source="reviewers")
28
29    class Meta:
30        model = LifecycleRule
31        fields = [
32            "id",
33            "name",
34            "content_type",
35            "object_id",
36            "interval",
37            "grace_period",
38            "reviewer_groups",
39            "reviewer_groups_obj",
40            "min_reviewers",
41            "min_reviewers_is_per_group",
42            "reviewers",
43            "reviewers_obj",
44            "notification_transports",
45            "target_verbose",
46        ]
47        read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"]
48
49    def get_target_verbose(self, rule: LifecycleRule) -> str:
50        if rule.object_id is None:
51            return rule.content_type.model_class()._meta.verbose_name_plural
52        else:
53            return f"{rule.content_type.model_class()._meta.verbose_name}: {rule.object}"
54
55    def validate_object_id(self, value: str) -> str | None:
56        if value == "":
57            return None
58        return value
59
60    def validate(self, attrs: dict) -> dict:
61        if (
62            attrs.get("object_id") is not None
63            and not attrs["content_type"]
64            .get_all_objects_for_this_type(pk=attrs["object_id"])
65            .exists()
66        ):
67            raise ValidationError({"object_id": _("Object does not exist")})
68        if "reviewer_groups" in attrs or "reviewers" in attrs:
69            reviewer_groups = attrs.get(
70                "reviewer_groups", self.instance.reviewer_groups.all() if self.instance else []
71            )
72            reviewers = attrs.get(
73                "reviewers", self.instance.reviewers.all() if self.instance else []
74            )
75            if len(reviewer_groups) == 0 and len(reviewers) == 0:
76                raise ValidationError(_("Either a reviewer group or a reviewer must be set."))
77        if "grace_period" in attrs or "interval" in attrs:
78            grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None))
79            interval = attrs.get("interval", getattr(self.instance, "interval", None))
80            if (
81                grace_period is not None
82                and interval is not None
83                and (timedelta_from_string(grace_period) > timedelta_from_string(interval))
84            ):
85                raise ValidationError(
86                    {"grace_period": _("Grace period must be shorter than the interval.")}
87                )
88        return attrs

Mixin to validate that a valid enterprise license exists before allowing to save the object

content_type
target_verbose
reviewer_groups_obj
reviewers
reviewers_obj
def get_target_verbose(self, rule: authentik.enterprise.lifecycle.models.LifecycleRule) -> str:
49    def get_target_verbose(self, rule: LifecycleRule) -> str:
50        if rule.object_id is None:
51            return rule.content_type.model_class()._meta.verbose_name_plural
52        else:
53            return f"{rule.content_type.model_class()._meta.verbose_name}: {rule.object}"
def validate_object_id(self, value: str) -> str | None:
55    def validate_object_id(self, value: str) -> str | None:
56        if value == "":
57            return None
58        return value
def validate(self, attrs: dict) -> dict:
60    def validate(self, attrs: dict) -> dict:
61        if (
62            attrs.get("object_id") is not None
63            and not attrs["content_type"]
64            .get_all_objects_for_this_type(pk=attrs["object_id"])
65            .exists()
66        ):
67            raise ValidationError({"object_id": _("Object does not exist")})
68        if "reviewer_groups" in attrs or "reviewers" in attrs:
69            reviewer_groups = attrs.get(
70                "reviewer_groups", self.instance.reviewer_groups.all() if self.instance else []
71            )
72            reviewers = attrs.get(
73                "reviewers", self.instance.reviewers.all() if self.instance else []
74            )
75            if len(reviewer_groups) == 0 and len(reviewers) == 0:
76                raise ValidationError(_("Either a reviewer group or a reviewer must be set."))
77        if "grace_period" in attrs or "interval" in attrs:
78            grace_period = attrs.get("grace_period", getattr(self.instance, "grace_period", None))
79            interval = attrs.get("interval", getattr(self.instance, "interval", None))
80            if (
81                grace_period is not None
82                and interval is not None
83                and (timedelta_from_string(grace_period) > timedelta_from_string(interval))
84            ):
85                raise ValidationError(
86                    {"grace_period": _("Grace period must be shorter than the interval.")}
87                )
88        return attrs

Check that a valid license exists

class LifecycleRuleSerializer.Meta:
29    class Meta:
30        model = LifecycleRule
31        fields = [
32            "id",
33            "name",
34            "content_type",
35            "object_id",
36            "interval",
37            "grace_period",
38            "reviewer_groups",
39            "reviewer_groups_obj",
40            "min_reviewers",
41            "min_reviewers_is_per_group",
42            "reviewers",
43            "reviewers_obj",
44            "notification_transports",
45            "target_verbose",
46        ]
47        read_only_fields = ["id", "reviewers_obj", "reviewer_groups_obj", "target_verbose"]
fields = ['id', 'name', 'content_type', 'object_id', 'interval', 'grace_period', 'reviewer_groups', 'reviewer_groups_obj', 'min_reviewers', 'min_reviewers_is_per_group', 'reviewers', 'reviewers_obj', 'notification_transports', 'target_verbose']
read_only_fields = ['id', 'reviewers_obj', 'reviewer_groups_obj', 'target_verbose']
class LifecycleRuleViewSet(rest_framework.viewsets.ModelViewSet):
91class LifecycleRuleViewSet(ModelViewSet):
92    queryset = LifecycleRule.objects.all()
93    serializer_class = LifecycleRuleSerializer
94    search_fields = ["content_type__model", "reviewer_groups__name", "reviewers__username"]
95    ordering = ["name"]
96    ordering_fields = ["name", "content_type__model"]
97    filterset_fields = ["content_type__model"]

A viewset that provides default create(), retrieve(), update(), partial_update(), destroy() and list() actions.

queryset = <QuerySet []>
serializer_class = <class 'LifecycleRuleSerializer'>
search_fields = ['content_type__model', 'reviewer_groups__name', 'reviewers__username']
ordering = ['name']
ordering_fields = ['name', 'content_type__model']
filterset_fields = ['content_type__model']
name = None
description = None
suffix = None
detail = None
basename = None