authentik.enterprise.lifecycle.models

  1from datetime import timedelta
  2from uuid import uuid4
  3
  4from django.contrib.contenttypes.fields import GenericForeignKey
  5from django.contrib.contenttypes.models import ContentType
  6from django.db import models
  7from django.db.models import Q, QuerySet
  8from django.db.models.fields import Field
  9from django.db.models.functions import Cast
 10from django.http import HttpRequest
 11from django.utils import timezone
 12from django.utils.translation import gettext as _
 13from rest_framework.serializers import BaseSerializer
 14
 15from authentik.blueprints.models import ManagedModel
 16from authentik.core.models import Group, User
 17from authentik.enterprise.lifecycle.utils import link_for_model, start_of_day
 18from authentik.events.models import Event, EventAction, NotificationSeverity, NotificationTransport
 19from authentik.lib.models import SerializerModel
 20from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
 21
 22
 23class LifecycleRule(SerializerModel):
 24    id = models.UUIDField(primary_key=True, default=uuid4)
 25    name = models.TextField(unique=True)
 26    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
 27    object_id = models.TextField(null=True, default=None)
 28    object = GenericForeignKey("content_type", "object_id")
 29
 30    interval = models.TextField(
 31        default="days=60",
 32        validators=[timedelta_string_validator],
 33    )
 34    # Grace period starts after a review is due
 35    grace_period = models.TextField(
 36        default="days=30",
 37        validators=[timedelta_string_validator],
 38    )
 39
 40    # The review has to be conducted by `min_reviewers` members of `reviewer_groups`
 41    # (total or per group depending on `min_reviewers_is_per_group` flag) as well
 42    # as all of `reviewers`
 43    reviewer_groups = models.ManyToManyField("authentik_core.Group", blank=True)
 44    min_reviewers = models.PositiveSmallIntegerField(default=1)
 45    min_reviewers_is_per_group = models.BooleanField(default=False)
 46    reviewers = models.ManyToManyField("authentik_core.User", blank=True)
 47
 48    notification_transports = models.ManyToManyField(
 49        NotificationTransport,
 50        help_text=_(
 51            "Select which transports should be used to notify the reviewers. If none are "
 52            "selected, the notification will only be shown in the authentik UI."
 53        ),
 54        blank=True,
 55    )
 56
 57    class Meta:
 58        indexes = [models.Index(fields=["content_type"])]
 59
 60    @property
 61    def serializer(self) -> type[BaseSerializer]:
 62        from authentik.enterprise.lifecycle.api.rules import LifecycleRuleSerializer
 63
 64        return LifecycleRuleSerializer
 65
 66    def _get_pk_field(self) -> Field:
 67        model = self.content_type.model_class()
 68        pk = model._meta.pk
 69        while hasattr(pk, "target_field"):
 70            pk = pk.target_field
 71        return pk.__class__()
 72
 73    def get_objects(self) -> QuerySet:
 74        qs = self.content_type.get_all_objects_for_this_type()
 75        if self.object_id:
 76            qs = qs.filter(pk=self.object_id)
 77        return qs
 78
 79    def _get_stale_iterations(self) -> QuerySet[LifecycleIteration]:
 80        filter = ~Q(content_type=self.content_type)
 81        if self.object_id:
 82            filter = filter | ~Q(object_id=self.object_id)
 83        filter = Q(state__in=(ReviewState.PENDING, ReviewState.OVERDUE)) & filter
 84        return self.lifecycleiteration_set.filter(filter)
 85
 86    def _get_newly_overdue_iterations(self) -> QuerySet[LifecycleIteration]:
 87        return self.lifecycleiteration_set.filter(
 88            opened_on__lt=start_of_day(
 89                timezone.now() + timedelta(days=1) - timedelta_from_string(self.grace_period)
 90            ),
 91            state=ReviewState.PENDING,
 92        )
 93
 94    def _get_newly_due_objects(self) -> QuerySet:
 95        recent_iteration_ids = LifecycleIteration.objects.filter(
 96            rule=self,
 97            opened_on__gte=start_of_day(
 98                timezone.now() + timedelta(days=1) - timedelta_from_string(self.interval)
 99            ),
100        ).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
101
102        return self.get_objects().exclude(pk__in=recent_iteration_ids)
103
104    def apply(self):
105        self._get_stale_iterations().update(state=ReviewState.CANCELED)
106
107        for iteration in self._get_newly_overdue_iterations():
108            iteration.make_overdue()
109
110        for obj in self._get_newly_due_objects():
111            LifecycleIteration.start(content_type=self.content_type, object_id=obj.pk, rule=self)
112
113    def is_satisfied_for_iteration(self, iteration: LifecycleIteration) -> bool:
114        reviewers = self.reviewers.all()
115        if (
116            iteration.review_set.filter(reviewer__in=reviewers).distinct("reviewer").count()
117            < reviewers.count()
118        ):
119            return False
120        if self.reviewer_groups.count() == 0:
121            return True
122        if self.min_reviewers_is_per_group:
123            for g in self.reviewer_groups.all():
124                if (
125                    iteration.review_set.filter(
126                        reviewer__groups__in=Group.objects.filter(pk=g.pk).with_descendants()
127                    )
128                    .distinct()
129                    .count()
130                    < self.min_reviewers
131                ):
132                    return False
133            return True
134        else:
135            return (
136                iteration.review_set.filter(
137                    reviewer__groups__in=self.reviewer_groups.all().with_descendants()
138                )
139                .distinct()
140                .count()
141                >= self.min_reviewers
142            )
143
144    def get_reviewers(self) -> QuerySet[User]:
145        return User.objects.filter(
146            Q(id__in=self.reviewers.all().values_list("pk", flat=True))
147            | Q(groups__in=self.reviewer_groups.all().with_descendants())
148        ).distinct()
149
150    def notify_reviewers(self, event: Event, severity: str):
151        from authentik.enterprise.lifecycle.tasks import send_notification
152
153        for transport in self.notification_transports.all():
154            for user in self.get_reviewers():
155                send_notification.send_with_options(
156                    args=(transport.pk, event.pk, user.pk, severity),
157                    rel_obj=transport,
158                )
159                if transport.send_once:
160                    break
161
162
163class ReviewState(models.TextChoices):
164    REVIEWED = "REVIEWED", _("Reviewed")
165    PENDING = "PENDING", _("Pending")
166    OVERDUE = "OVERDUE", _("Overdue")
167    CANCELED = "CANCELED", _("Canceled")
168
169
170class LifecycleIteration(SerializerModel, ManagedModel):
171    id = models.UUIDField(primary_key=True, default=uuid4)
172    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
173    object_id = models.TextField(null=False)
174    object = GenericForeignKey("content_type", "object_id")
175
176    rule = models.ForeignKey(LifecycleRule, null=True, on_delete=models.SET_NULL)
177
178    state = models.CharField(max_length=10, choices=ReviewState, default=ReviewState.PENDING)
179    opened_on = models.DateTimeField(auto_now_add=True)
180
181    class Meta:
182        indexes = [models.Index(fields=["content_type", "opened_on"])]
183
184    @property
185    def serializer(self) -> type[BaseSerializer]:
186        from authentik.enterprise.lifecycle.api.iterations import LifecycleIterationSerializer
187
188        return LifecycleIterationSerializer
189
190    def _get_model_name(self) -> str:
191        return self.content_type.name.lower()
192
193    def _get_event_args(self) -> dict:
194        return {
195            "target": self.object,
196            "hyperlink": link_for_model(self.object),
197            "hyperlink_label": _(f"Go to {self._get_model_name()}"),
198            "lifecycle_iteration": self.id,
199        }
200
201    def initialize(self):
202        if (self.content_type.app_label, self.content_type.model) == ("authentik_core", "group"):
203            object_label = self.object.name
204        elif (self.content_type.app_label, self.content_type.model) == ("authentik_rbac", "role"):
205            object_label = self.object.name
206        else:
207            object_label = str(self.object)
208        event = Event.new(
209            EventAction.REVIEW_INITIATED,
210            message=_(f"Access review is due for {self.content_type.name.lower()} {object_label}"),
211            **self._get_event_args(),
212        )
213        event.save()
214        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
215
216    def make_overdue(self):
217        self.state = ReviewState.OVERDUE
218
219        event = Event.new(
220            EventAction.REVIEW_OVERDUE,
221            message=_(f"Access review is overdue for {self.content_type.name} {str(self.object)}"),
222            **self._get_event_args(),
223        )
224        event.save()
225        self.rule.notify_reviewers(event, NotificationSeverity.ALERT)
226        self.save()
227
228    @staticmethod
229    def start(content_type: ContentType, object_id: str, rule: LifecycleRule) -> LifecycleIteration:
230        iteration = LifecycleIteration.objects.create(
231            content_type=content_type, object_id=object_id, rule=rule
232        )
233        iteration.initialize()
234        return iteration
235
236    def make_reviewed(self, request: HttpRequest):
237        self.state = ReviewState.REVIEWED
238        event = Event.new(
239            EventAction.REVIEW_COMPLETED,
240            message=_(f"Access review completed for {self.content_type.name} {str(self.object)}"),
241            **self._get_event_args(),
242        ).from_http(request)
243        event.save()
244        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
245        self.save()
246
247    def on_review(self, request: HttpRequest):
248        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
249            raise AssertionError("Review is not pending or overdue")
250        if self.rule.is_satisfied_for_iteration(self):
251            self.make_reviewed(request)
252
253    def user_can_review(self, user: User) -> bool:
254        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
255            return False
256        if self.review_set.filter(reviewer=user).exists():
257            return False
258        groups = self.rule.reviewer_groups.all()
259        if groups:
260            for group in groups:
261                if group.is_member(user):
262                    return True
263            return False
264        else:
265            return user in self.rule.get_reviewers()
266
267
268class Review(SerializerModel):
269    id = models.UUIDField(primary_key=True, default=uuid4)
270    iteration = models.ForeignKey(LifecycleIteration, on_delete=models.CASCADE)
271
272    reviewer = models.ForeignKey("authentik_core.User", on_delete=models.CASCADE)
273    timestamp = models.DateTimeField(auto_now_add=True)
274    note = models.TextField(null=True)
275
276    class Meta:
277        unique_together = [["iteration", "reviewer"]]
278
279    @property
280    def serializer(self) -> type[BaseSerializer]:
281        from authentik.enterprise.lifecycle.api.reviews import ReviewSerializer
282
283        return ReviewSerializer
class LifecycleRule(authentik.lib.models.SerializerModel):
 24class LifecycleRule(SerializerModel):
 25    id = models.UUIDField(primary_key=True, default=uuid4)
 26    name = models.TextField(unique=True)
 27    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
 28    object_id = models.TextField(null=True, default=None)
 29    object = GenericForeignKey("content_type", "object_id")
 30
 31    interval = models.TextField(
 32        default="days=60",
 33        validators=[timedelta_string_validator],
 34    )
 35    # Grace period starts after a review is due
 36    grace_period = models.TextField(
 37        default="days=30",
 38        validators=[timedelta_string_validator],
 39    )
 40
 41    # The review has to be conducted by `min_reviewers` members of `reviewer_groups`
 42    # (total or per group depending on `min_reviewers_is_per_group` flag) as well
 43    # as all of `reviewers`
 44    reviewer_groups = models.ManyToManyField("authentik_core.Group", blank=True)
 45    min_reviewers = models.PositiveSmallIntegerField(default=1)
 46    min_reviewers_is_per_group = models.BooleanField(default=False)
 47    reviewers = models.ManyToManyField("authentik_core.User", blank=True)
 48
 49    notification_transports = models.ManyToManyField(
 50        NotificationTransport,
 51        help_text=_(
 52            "Select which transports should be used to notify the reviewers. If none are "
 53            "selected, the notification will only be shown in the authentik UI."
 54        ),
 55        blank=True,
 56    )
 57
 58    class Meta:
 59        indexes = [models.Index(fields=["content_type"])]
 60
 61    @property
 62    def serializer(self) -> type[BaseSerializer]:
 63        from authentik.enterprise.lifecycle.api.rules import LifecycleRuleSerializer
 64
 65        return LifecycleRuleSerializer
 66
 67    def _get_pk_field(self) -> Field:
 68        model = self.content_type.model_class()
 69        pk = model._meta.pk
 70        while hasattr(pk, "target_field"):
 71            pk = pk.target_field
 72        return pk.__class__()
 73
 74    def get_objects(self) -> QuerySet:
 75        qs = self.content_type.get_all_objects_for_this_type()
 76        if self.object_id:
 77            qs = qs.filter(pk=self.object_id)
 78        return qs
 79
 80    def _get_stale_iterations(self) -> QuerySet[LifecycleIteration]:
 81        filter = ~Q(content_type=self.content_type)
 82        if self.object_id:
 83            filter = filter | ~Q(object_id=self.object_id)
 84        filter = Q(state__in=(ReviewState.PENDING, ReviewState.OVERDUE)) & filter
 85        return self.lifecycleiteration_set.filter(filter)
 86
 87    def _get_newly_overdue_iterations(self) -> QuerySet[LifecycleIteration]:
 88        return self.lifecycleiteration_set.filter(
 89            opened_on__lt=start_of_day(
 90                timezone.now() + timedelta(days=1) - timedelta_from_string(self.grace_period)
 91            ),
 92            state=ReviewState.PENDING,
 93        )
 94
 95    def _get_newly_due_objects(self) -> QuerySet:
 96        recent_iteration_ids = LifecycleIteration.objects.filter(
 97            rule=self,
 98            opened_on__gte=start_of_day(
 99                timezone.now() + timedelta(days=1) - timedelta_from_string(self.interval)
100            ),
101        ).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
102
103        return self.get_objects().exclude(pk__in=recent_iteration_ids)
104
105    def apply(self):
106        self._get_stale_iterations().update(state=ReviewState.CANCELED)
107
108        for iteration in self._get_newly_overdue_iterations():
109            iteration.make_overdue()
110
111        for obj in self._get_newly_due_objects():
112            LifecycleIteration.start(content_type=self.content_type, object_id=obj.pk, rule=self)
113
114    def is_satisfied_for_iteration(self, iteration: LifecycleIteration) -> bool:
115        reviewers = self.reviewers.all()
116        if (
117            iteration.review_set.filter(reviewer__in=reviewers).distinct("reviewer").count()
118            < reviewers.count()
119        ):
120            return False
121        if self.reviewer_groups.count() == 0:
122            return True
123        if self.min_reviewers_is_per_group:
124            for g in self.reviewer_groups.all():
125                if (
126                    iteration.review_set.filter(
127                        reviewer__groups__in=Group.objects.filter(pk=g.pk).with_descendants()
128                    )
129                    .distinct()
130                    .count()
131                    < self.min_reviewers
132                ):
133                    return False
134            return True
135        else:
136            return (
137                iteration.review_set.filter(
138                    reviewer__groups__in=self.reviewer_groups.all().with_descendants()
139                )
140                .distinct()
141                .count()
142                >= self.min_reviewers
143            )
144
145    def get_reviewers(self) -> QuerySet[User]:
146        return User.objects.filter(
147            Q(id__in=self.reviewers.all().values_list("pk", flat=True))
148            | Q(groups__in=self.reviewer_groups.all().with_descendants())
149        ).distinct()
150
151    def notify_reviewers(self, event: Event, severity: str):
152        from authentik.enterprise.lifecycle.tasks import send_notification
153
154        for transport in self.notification_transports.all():
155            for user in self.get_reviewers():
156                send_notification.send_with_options(
157                    args=(transport.pk, event.pk, user.pk, severity),
158                    rel_obj=transport,
159                )
160                if transport.send_once:
161                    break

LifecycleRule(id, name, content_type, object_id, interval, grace_period, min_reviewers, min_reviewers_is_per_group)

def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

content_type

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def object_id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

object

Provide a generic many-to-one relation through the content_type and object_id fields.

This class also doubles as an accessor to the related object (similar to ForwardManyToOneDescriptor) by adding itself as a model attribute.

def interval(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def grace_period(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

reviewer_groups

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

def min_reviewers(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def min_reviewers_is_per_group(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

reviewers

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

notification_transports

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

serializer: type[rest_framework.serializers.BaseSerializer]
61    @property
62    def serializer(self) -> type[BaseSerializer]:
63        from authentik.enterprise.lifecycle.api.rules import LifecycleRuleSerializer
64
65        return LifecycleRuleSerializer

Get serializer for this model

def get_objects(self) -> django.db.models.query.QuerySet:
74    def get_objects(self) -> QuerySet:
75        qs = self.content_type.get_all_objects_for_this_type()
76        if self.object_id:
77            qs = qs.filter(pk=self.object_id)
78        return qs
def apply(self):
105    def apply(self):
106        self._get_stale_iterations().update(state=ReviewState.CANCELED)
107
108        for iteration in self._get_newly_overdue_iterations():
109            iteration.make_overdue()
110
111        for obj in self._get_newly_due_objects():
112            LifecycleIteration.start(content_type=self.content_type, object_id=obj.pk, rule=self)
def is_satisfied_for_iteration( self, iteration: LifecycleIteration) -> bool:
114    def is_satisfied_for_iteration(self, iteration: LifecycleIteration) -> bool:
115        reviewers = self.reviewers.all()
116        if (
117            iteration.review_set.filter(reviewer__in=reviewers).distinct("reviewer").count()
118            < reviewers.count()
119        ):
120            return False
121        if self.reviewer_groups.count() == 0:
122            return True
123        if self.min_reviewers_is_per_group:
124            for g in self.reviewer_groups.all():
125                if (
126                    iteration.review_set.filter(
127                        reviewer__groups__in=Group.objects.filter(pk=g.pk).with_descendants()
128                    )
129                    .distinct()
130                    .count()
131                    < self.min_reviewers
132                ):
133                    return False
134            return True
135        else:
136            return (
137                iteration.review_set.filter(
138                    reviewer__groups__in=self.reviewer_groups.all().with_descendants()
139                )
140                .distinct()
141                .count()
142                >= self.min_reviewers
143            )
def get_reviewers(self) -> django.db.models.query.QuerySet:
145    def get_reviewers(self) -> QuerySet[User]:
146        return User.objects.filter(
147            Q(id__in=self.reviewers.all().values_list("pk", flat=True))
148            | Q(groups__in=self.reviewer_groups.all().with_descendants())
149        ).distinct()
def notify_reviewers(self, event: authentik.events.models.Event, severity: str):
151    def notify_reviewers(self, event: Event, severity: str):
152        from authentik.enterprise.lifecycle.tasks import send_notification
153
154        for transport in self.notification_transports.all():
155            for user in self.get_reviewers():
156                send_notification.send_with_options(
157                    args=(transport.pk, event.pk, user.pk, severity),
158                    rel_obj=transport,
159                )
160                if transport.send_once:
161                    break
content_type_id
def objects(unknown):

The type of the None singleton.

lifecycleiteration_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class LifecycleRule.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class LifecycleRule.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class ReviewState(django.db.models.enums.TextChoices):
164class ReviewState(models.TextChoices):
165    REVIEWED = "REVIEWED", _("Reviewed")
166    PENDING = "PENDING", _("Pending")
167    OVERDUE = "OVERDUE", _("Overdue")
168    CANCELED = "CANCELED", _("Canceled")

Class for creating enumerated string choices.

171class LifecycleIteration(SerializerModel, ManagedModel):
172    id = models.UUIDField(primary_key=True, default=uuid4)
173    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
174    object_id = models.TextField(null=False)
175    object = GenericForeignKey("content_type", "object_id")
176
177    rule = models.ForeignKey(LifecycleRule, null=True, on_delete=models.SET_NULL)
178
179    state = models.CharField(max_length=10, choices=ReviewState, default=ReviewState.PENDING)
180    opened_on = models.DateTimeField(auto_now_add=True)
181
182    class Meta:
183        indexes = [models.Index(fields=["content_type", "opened_on"])]
184
185    @property
186    def serializer(self) -> type[BaseSerializer]:
187        from authentik.enterprise.lifecycle.api.iterations import LifecycleIterationSerializer
188
189        return LifecycleIterationSerializer
190
191    def _get_model_name(self) -> str:
192        return self.content_type.name.lower()
193
194    def _get_event_args(self) -> dict:
195        return {
196            "target": self.object,
197            "hyperlink": link_for_model(self.object),
198            "hyperlink_label": _(f"Go to {self._get_model_name()}"),
199            "lifecycle_iteration": self.id,
200        }
201
202    def initialize(self):
203        if (self.content_type.app_label, self.content_type.model) == ("authentik_core", "group"):
204            object_label = self.object.name
205        elif (self.content_type.app_label, self.content_type.model) == ("authentik_rbac", "role"):
206            object_label = self.object.name
207        else:
208            object_label = str(self.object)
209        event = Event.new(
210            EventAction.REVIEW_INITIATED,
211            message=_(f"Access review is due for {self.content_type.name.lower()} {object_label}"),
212            **self._get_event_args(),
213        )
214        event.save()
215        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
216
217    def make_overdue(self):
218        self.state = ReviewState.OVERDUE
219
220        event = Event.new(
221            EventAction.REVIEW_OVERDUE,
222            message=_(f"Access review is overdue for {self.content_type.name} {str(self.object)}"),
223            **self._get_event_args(),
224        )
225        event.save()
226        self.rule.notify_reviewers(event, NotificationSeverity.ALERT)
227        self.save()
228
229    @staticmethod
230    def start(content_type: ContentType, object_id: str, rule: LifecycleRule) -> LifecycleIteration:
231        iteration = LifecycleIteration.objects.create(
232            content_type=content_type, object_id=object_id, rule=rule
233        )
234        iteration.initialize()
235        return iteration
236
237    def make_reviewed(self, request: HttpRequest):
238        self.state = ReviewState.REVIEWED
239        event = Event.new(
240            EventAction.REVIEW_COMPLETED,
241            message=_(f"Access review completed for {self.content_type.name} {str(self.object)}"),
242            **self._get_event_args(),
243        ).from_http(request)
244        event.save()
245        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
246        self.save()
247
248    def on_review(self, request: HttpRequest):
249        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
250            raise AssertionError("Review is not pending or overdue")
251        if self.rule.is_satisfied_for_iteration(self):
252            self.make_reviewed(request)
253
254    def user_can_review(self, user: User) -> bool:
255        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
256            return False
257        if self.review_set.filter(reviewer=user).exists():
258            return False
259        groups = self.rule.reviewer_groups.all()
260        if groups:
261            for group in groups:
262                if group.is_member(user):
263                    return True
264            return False
265        else:
266            return user in self.rule.get_reviewers()

LifecycleIteration(managed, id, content_type, object_id, rule, state, opened_on)

def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

content_type

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def object_id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

object

Provide a generic many-to-one relation through the content_type and object_id fields.

This class also doubles as an accessor to the related object (similar to ForwardManyToOneDescriptor) by adding itself as a model attribute.

rule

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def state(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def opened_on(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
185    @property
186    def serializer(self) -> type[BaseSerializer]:
187        from authentik.enterprise.lifecycle.api.iterations import LifecycleIterationSerializer
188
189        return LifecycleIterationSerializer

Get serializer for this model

def initialize(self):
202    def initialize(self):
203        if (self.content_type.app_label, self.content_type.model) == ("authentik_core", "group"):
204            object_label = self.object.name
205        elif (self.content_type.app_label, self.content_type.model) == ("authentik_rbac", "role"):
206            object_label = self.object.name
207        else:
208            object_label = str(self.object)
209        event = Event.new(
210            EventAction.REVIEW_INITIATED,
211            message=_(f"Access review is due for {self.content_type.name.lower()} {object_label}"),
212            **self._get_event_args(),
213        )
214        event.save()
215        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
def make_overdue(self):
217    def make_overdue(self):
218        self.state = ReviewState.OVERDUE
219
220        event = Event.new(
221            EventAction.REVIEW_OVERDUE,
222            message=_(f"Access review is overdue for {self.content_type.name} {str(self.object)}"),
223            **self._get_event_args(),
224        )
225        event.save()
226        self.rule.notify_reviewers(event, NotificationSeverity.ALERT)
227        self.save()
@staticmethod
def start( content_type: django.contrib.contenttypes.models.ContentType, object_id: str, rule: LifecycleRule) -> LifecycleIteration:
229    @staticmethod
230    def start(content_type: ContentType, object_id: str, rule: LifecycleRule) -> LifecycleIteration:
231        iteration = LifecycleIteration.objects.create(
232            content_type=content_type, object_id=object_id, rule=rule
233        )
234        iteration.initialize()
235        return iteration
def make_reviewed(self, request: django.http.request.HttpRequest):
237    def make_reviewed(self, request: HttpRequest):
238        self.state = ReviewState.REVIEWED
239        event = Event.new(
240            EventAction.REVIEW_COMPLETED,
241            message=_(f"Access review completed for {self.content_type.name} {str(self.object)}"),
242            **self._get_event_args(),
243        ).from_http(request)
244        event.save()
245        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
246        self.save()
def on_review(self, request: django.http.request.HttpRequest):
248    def on_review(self, request: HttpRequest):
249        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
250            raise AssertionError("Review is not pending or overdue")
251        if self.rule.is_satisfied_for_iteration(self):
252            self.make_reviewed(request)
def user_can_review(self, user: authentik.core.models.User) -> bool:
254    def user_can_review(self, user: User) -> bool:
255        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
256            return False
257        if self.review_set.filter(reviewer=user).exists():
258            return False
259        groups = self.rule.reviewer_groups.all()
260        if groups:
261            for group in groups:
262                if group.is_member(user):
263                    return True
264            return False
265        else:
266            return user in self.rule.get_reviewers()
def managed(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

content_type_id
rule_id
def get_state_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_opened_on(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_opened_on(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def objects(unknown):

The type of the None singleton.

review_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class LifecycleIteration.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class LifecycleIteration.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class Review(authentik.lib.models.SerializerModel):
269class Review(SerializerModel):
270    id = models.UUIDField(primary_key=True, default=uuid4)
271    iteration = models.ForeignKey(LifecycleIteration, on_delete=models.CASCADE)
272
273    reviewer = models.ForeignKey("authentik_core.User", on_delete=models.CASCADE)
274    timestamp = models.DateTimeField(auto_now_add=True)
275    note = models.TextField(null=True)
276
277    class Meta:
278        unique_together = [["iteration", "reviewer"]]
279
280    @property
281    def serializer(self) -> type[BaseSerializer]:
282        from authentik.enterprise.lifecycle.api.reviews import ReviewSerializer
283
284        return ReviewSerializer

Review(id, iteration, reviewer, timestamp, note)

def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

iteration

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

reviewer

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def timestamp(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def note(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
280    @property
281    def serializer(self) -> type[BaseSerializer]:
282        from authentik.enterprise.lifecycle.api.reviews import ReviewSerializer
283
284        return ReviewSerializer

Get serializer for this model

iteration_id
reviewer_id
def get_next_by_timestamp(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_timestamp(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def objects(unknown):

The type of the None singleton.

class Review.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class Review.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.