authentik.enterprise.lifecycle.models

  1from datetime import timedelta
  2from uuid import uuid4
  3
  4from django.contrib.contenttypes.fields import GenericForeignKey
  5from django.contrib.contenttypes.models import ContentType
  6from django.db import models
  7from django.db.models import Q, QuerySet
  8from django.db.models.fields import Field
  9from django.db.models.functions import Cast
 10from django.http import HttpRequest
 11from django.utils import timezone
 12from django.utils.translation import gettext as _
 13from rest_framework.serializers import BaseSerializer
 14
 15from authentik.blueprints.models import ManagedModel
 16from authentik.core.models import Group, User
 17from authentik.enterprise.lifecycle.utils import link_for_model, start_of_day
 18from authentik.events.models import Event, EventAction, NotificationSeverity, NotificationTransport
 19from authentik.lib.models import SerializerModel
 20from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
 21
 22
 23class LifecycleRule(SerializerModel):
 24    id = models.UUIDField(primary_key=True, default=uuid4)
 25    name = models.TextField(unique=True)
 26    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
 27    object_id = models.TextField(null=True, default=None)
 28    object = GenericForeignKey("content_type", "object_id")
 29
 30    interval = models.TextField(
 31        default="days=60",
 32        validators=[timedelta_string_validator],
 33    )
 34    # Grace period starts after a review is due
 35    grace_period = models.TextField(
 36        default="days=30",
 37        validators=[timedelta_string_validator],
 38    )
 39
 40    # The review has to be conducted by `min_reviewers` members of `reviewer_groups`
 41    # (total or per group depending on `min_reviewers_is_per_group` flag) as well
 42    # as all of `reviewers`
 43    reviewer_groups = models.ManyToManyField("authentik_core.Group", blank=True)
 44    min_reviewers = models.PositiveSmallIntegerField(default=1)
 45    min_reviewers_is_per_group = models.BooleanField(default=False)
 46    reviewers = models.ManyToManyField("authentik_core.User", blank=True)
 47
 48    notification_transports = models.ManyToManyField(
 49        NotificationTransport,
 50        help_text=_(
 51            "Select which transports should be used to notify the reviewers. If none are "
 52            "selected, the notification will only be shown in the authentik UI."
 53        ),
 54        blank=True,
 55    )
 56
 57    class Meta:
 58        indexes = [models.Index(fields=["content_type"])]
 59        unique_together = [["content_type", "object_id"]]
 60        constraints = [
 61            models.UniqueConstraint(
 62                fields=["content_type"],
 63                condition=Q(object_id__isnull=True),
 64                name="uniq_lifecycle_rule_ct_null_object",
 65            )
 66        ]
 67
 68    @property
 69    def serializer(self) -> type[BaseSerializer]:
 70        from authentik.enterprise.lifecycle.api.rules import LifecycleRuleSerializer
 71
 72        return LifecycleRuleSerializer
 73
 74    def _get_pk_field(self) -> Field:
 75        model = self.content_type.model_class()
 76        pk = model._meta.pk
 77        while hasattr(pk, "target_field"):
 78            pk = pk.target_field
 79        return pk.__class__()
 80
 81    def get_objects(self) -> QuerySet:
 82        qs = self.content_type.get_all_objects_for_this_type()
 83        if self.object_id:
 84            qs = qs.filter(pk=self.object_id)
 85        else:
 86            qs = qs.exclude(
 87                pk__in=LifecycleRule.objects.filter(
 88                    content_type=self.content_type, object_id__isnull=False
 89                ).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
 90            )
 91        return qs
 92
 93    def _get_stale_iterations(self) -> QuerySet[LifecycleIteration]:
 94        filter = ~Q(content_type=self.content_type)
 95        if self.object_id:
 96            filter = filter | ~Q(object_id=self.object_id)
 97        filter = Q(state__in=(ReviewState.PENDING, ReviewState.OVERDUE)) & filter
 98        return self.lifecycleiteration_set.filter(filter)
 99
100    def _get_newly_overdue_iterations(self) -> QuerySet[LifecycleIteration]:
101        return self.lifecycleiteration_set.filter(
102            opened_on__lt=start_of_day(
103                timezone.now() + timedelta(days=1) - timedelta_from_string(self.grace_period)
104            ),
105            state=ReviewState.PENDING,
106        )
107
108    def _get_newly_due_objects(self) -> QuerySet:
109        recent_iteration_ids = LifecycleIteration.objects.filter(
110            content_type=self.content_type,
111            object_id__isnull=False,
112            opened_on__gte=start_of_day(
113                timezone.now() + timedelta(days=1) - timedelta_from_string(self.interval)
114            ),
115        ).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
116
117        return self.get_objects().exclude(pk__in=recent_iteration_ids)
118
119    def apply(self):
120        self._get_stale_iterations().update(state=ReviewState.CANCELED)
121
122        for iteration in self._get_newly_overdue_iterations():
123            iteration.make_overdue()
124
125        for obj in self._get_newly_due_objects():
126            LifecycleIteration.start(content_type=self.content_type, object_id=obj.pk, rule=self)
127
128    def is_satisfied_for_iteration(self, iteration: LifecycleIteration) -> bool:
129        reviewers = self.reviewers.all()
130        if (
131            iteration.review_set.filter(reviewer__in=reviewers).distinct("reviewer").count()
132            < reviewers.count()
133        ):
134            return False
135        if self.reviewer_groups.count() == 0:
136            return True
137        if self.min_reviewers_is_per_group:
138            for g in self.reviewer_groups.all():
139                if (
140                    iteration.review_set.filter(
141                        reviewer__groups__in=Group.objects.filter(pk=g.pk).with_descendants()
142                    )
143                    .distinct()
144                    .count()
145                    < self.min_reviewers
146                ):
147                    return False
148            return True
149        else:
150            return (
151                iteration.review_set.filter(
152                    reviewer__groups__in=self.reviewer_groups.all().with_descendants()
153                )
154                .distinct()
155                .count()
156                >= self.min_reviewers
157            )
158
159    def get_reviewers(self) -> QuerySet[User]:
160        return User.objects.filter(
161            Q(id__in=self.reviewers.all().values_list("pk", flat=True))
162            | Q(groups__in=self.reviewer_groups.all().with_descendants())
163        ).distinct()
164
165    def notify_reviewers(self, event: Event, severity: str):
166        from authentik.enterprise.lifecycle.tasks import send_notification
167
168        for transport in self.notification_transports.all():
169            for user in self.get_reviewers():
170                send_notification.send_with_options(
171                    args=(transport.pk, event.pk, user.pk, severity),
172                    rel_obj=transport,
173                )
174                if transport.send_once:
175                    break
176
177
178class ReviewState(models.TextChoices):
179    REVIEWED = "REVIEWED", _("Reviewed")
180    PENDING = "PENDING", _("Pending")
181    OVERDUE = "OVERDUE", _("Overdue")
182    CANCELED = "CANCELED", _("Canceled")
183
184
185class LifecycleIteration(SerializerModel, ManagedModel):
186    id = models.UUIDField(primary_key=True, default=uuid4)
187    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
188    object_id = models.TextField(null=False)
189    object = GenericForeignKey("content_type", "object_id")
190
191    rule = models.ForeignKey(LifecycleRule, null=True, on_delete=models.SET_NULL)
192
193    state = models.CharField(max_length=10, choices=ReviewState, default=ReviewState.PENDING)
194    opened_on = models.DateTimeField(auto_now_add=True)
195
196    class Meta:
197        indexes = [models.Index(fields=["content_type", "opened_on"])]
198
199    @property
200    def serializer(self) -> type[BaseSerializer]:
201        from authentik.enterprise.lifecycle.api.iterations import LifecycleIterationSerializer
202
203        return LifecycleIterationSerializer
204
205    def _get_model_name(self) -> str:
206        return self.content_type.name.lower()
207
208    def _get_event_args(self) -> dict:
209        return {
210            "target": self.object,
211            "hyperlink": link_for_model(self.object),
212            "hyperlink_label": _(f"Go to {self._get_model_name()}"),
213            "lifecycle_iteration": self.id,
214        }
215
216    def initialize(self):
217        event = Event.new(
218            EventAction.REVIEW_INITIATED,
219            message=_(f"Access review is due for {self.content_type.name} {str(self.object)}"),
220            **self._get_event_args(),
221        )
222        event.save()
223        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
224
225    def make_overdue(self):
226        self.state = ReviewState.OVERDUE
227
228        event = Event.new(
229            EventAction.REVIEW_OVERDUE,
230            message=_(f"Access review is overdue for {self.content_type.name} {str(self.object)}"),
231            **self._get_event_args(),
232        )
233        event.save()
234        self.rule.notify_reviewers(event, NotificationSeverity.ALERT)
235        self.save()
236
237    @staticmethod
238    def start(content_type: ContentType, object_id: str, rule: LifecycleRule) -> LifecycleIteration:
239        iteration = LifecycleIteration.objects.create(
240            content_type=content_type, object_id=object_id, rule=rule
241        )
242        iteration.initialize()
243        return iteration
244
245    def make_reviewed(self, request: HttpRequest):
246        self.state = ReviewState.REVIEWED
247        event = Event.new(
248            EventAction.REVIEW_COMPLETED,
249            message=_(f"Access review completed for {self.content_type.name} {str(self.object)}"),
250            **self._get_event_args(),
251        ).from_http(request)
252        event.save()
253        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
254        self.save()
255
256    def on_review(self, request: HttpRequest):
257        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
258            raise AssertionError("Review is not pending or overdue")
259        if self.rule.is_satisfied_for_iteration(self):
260            self.make_reviewed(request)
261
262    def user_can_review(self, user: User) -> bool:
263        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
264            return False
265        if self.review_set.filter(reviewer=user).exists():
266            return False
267        groups = self.rule.reviewer_groups.all()
268        if groups:
269            for group in groups:
270                if group.is_member(user):
271                    return True
272            return False
273        else:
274            return user in self.rule.get_reviewers()
275
276
277class Review(SerializerModel):
278    id = models.UUIDField(primary_key=True, default=uuid4)
279    iteration = models.ForeignKey(LifecycleIteration, on_delete=models.CASCADE)
280
281    reviewer = models.ForeignKey("authentik_core.User", on_delete=models.CASCADE)
282    timestamp = models.DateTimeField(auto_now_add=True)
283    note = models.TextField(null=True)
284
285    class Meta:
286        unique_together = [["iteration", "reviewer"]]
287
288    @property
289    def serializer(self) -> type[BaseSerializer]:
290        from authentik.enterprise.lifecycle.api.reviews import ReviewSerializer
291
292        return ReviewSerializer
class LifecycleRule(authentik.lib.models.SerializerModel):
 24class LifecycleRule(SerializerModel):
 25    id = models.UUIDField(primary_key=True, default=uuid4)
 26    name = models.TextField(unique=True)
 27    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
 28    object_id = models.TextField(null=True, default=None)
 29    object = GenericForeignKey("content_type", "object_id")
 30
 31    interval = models.TextField(
 32        default="days=60",
 33        validators=[timedelta_string_validator],
 34    )
 35    # Grace period starts after a review is due
 36    grace_period = models.TextField(
 37        default="days=30",
 38        validators=[timedelta_string_validator],
 39    )
 40
 41    # The review has to be conducted by `min_reviewers` members of `reviewer_groups`
 42    # (total or per group depending on `min_reviewers_is_per_group` flag) as well
 43    # as all of `reviewers`
 44    reviewer_groups = models.ManyToManyField("authentik_core.Group", blank=True)
 45    min_reviewers = models.PositiveSmallIntegerField(default=1)
 46    min_reviewers_is_per_group = models.BooleanField(default=False)
 47    reviewers = models.ManyToManyField("authentik_core.User", blank=True)
 48
 49    notification_transports = models.ManyToManyField(
 50        NotificationTransport,
 51        help_text=_(
 52            "Select which transports should be used to notify the reviewers. If none are "
 53            "selected, the notification will only be shown in the authentik UI."
 54        ),
 55        blank=True,
 56    )
 57
 58    class Meta:
 59        indexes = [models.Index(fields=["content_type"])]
 60        unique_together = [["content_type", "object_id"]]
 61        constraints = [
 62            models.UniqueConstraint(
 63                fields=["content_type"],
 64                condition=Q(object_id__isnull=True),
 65                name="uniq_lifecycle_rule_ct_null_object",
 66            )
 67        ]
 68
 69    @property
 70    def serializer(self) -> type[BaseSerializer]:
 71        from authentik.enterprise.lifecycle.api.rules import LifecycleRuleSerializer
 72
 73        return LifecycleRuleSerializer
 74
 75    def _get_pk_field(self) -> Field:
 76        model = self.content_type.model_class()
 77        pk = model._meta.pk
 78        while hasattr(pk, "target_field"):
 79            pk = pk.target_field
 80        return pk.__class__()
 81
 82    def get_objects(self) -> QuerySet:
 83        qs = self.content_type.get_all_objects_for_this_type()
 84        if self.object_id:
 85            qs = qs.filter(pk=self.object_id)
 86        else:
 87            qs = qs.exclude(
 88                pk__in=LifecycleRule.objects.filter(
 89                    content_type=self.content_type, object_id__isnull=False
 90                ).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
 91            )
 92        return qs
 93
 94    def _get_stale_iterations(self) -> QuerySet[LifecycleIteration]:
 95        filter = ~Q(content_type=self.content_type)
 96        if self.object_id:
 97            filter = filter | ~Q(object_id=self.object_id)
 98        filter = Q(state__in=(ReviewState.PENDING, ReviewState.OVERDUE)) & filter
 99        return self.lifecycleiteration_set.filter(filter)
100
101    def _get_newly_overdue_iterations(self) -> QuerySet[LifecycleIteration]:
102        return self.lifecycleiteration_set.filter(
103            opened_on__lt=start_of_day(
104                timezone.now() + timedelta(days=1) - timedelta_from_string(self.grace_period)
105            ),
106            state=ReviewState.PENDING,
107        )
108
109    def _get_newly_due_objects(self) -> QuerySet:
110        recent_iteration_ids = LifecycleIteration.objects.filter(
111            content_type=self.content_type,
112            object_id__isnull=False,
113            opened_on__gte=start_of_day(
114                timezone.now() + timedelta(days=1) - timedelta_from_string(self.interval)
115            ),
116        ).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
117
118        return self.get_objects().exclude(pk__in=recent_iteration_ids)
119
120    def apply(self):
121        self._get_stale_iterations().update(state=ReviewState.CANCELED)
122
123        for iteration in self._get_newly_overdue_iterations():
124            iteration.make_overdue()
125
126        for obj in self._get_newly_due_objects():
127            LifecycleIteration.start(content_type=self.content_type, object_id=obj.pk, rule=self)
128
129    def is_satisfied_for_iteration(self, iteration: LifecycleIteration) -> bool:
130        reviewers = self.reviewers.all()
131        if (
132            iteration.review_set.filter(reviewer__in=reviewers).distinct("reviewer").count()
133            < reviewers.count()
134        ):
135            return False
136        if self.reviewer_groups.count() == 0:
137            return True
138        if self.min_reviewers_is_per_group:
139            for g in self.reviewer_groups.all():
140                if (
141                    iteration.review_set.filter(
142                        reviewer__groups__in=Group.objects.filter(pk=g.pk).with_descendants()
143                    )
144                    .distinct()
145                    .count()
146                    < self.min_reviewers
147                ):
148                    return False
149            return True
150        else:
151            return (
152                iteration.review_set.filter(
153                    reviewer__groups__in=self.reviewer_groups.all().with_descendants()
154                )
155                .distinct()
156                .count()
157                >= self.min_reviewers
158            )
159
160    def get_reviewers(self) -> QuerySet[User]:
161        return User.objects.filter(
162            Q(id__in=self.reviewers.all().values_list("pk", flat=True))
163            | Q(groups__in=self.reviewer_groups.all().with_descendants())
164        ).distinct()
165
166    def notify_reviewers(self, event: Event, severity: str):
167        from authentik.enterprise.lifecycle.tasks import send_notification
168
169        for transport in self.notification_transports.all():
170            for user in self.get_reviewers():
171                send_notification.send_with_options(
172                    args=(transport.pk, event.pk, user.pk, severity),
173                    rel_obj=transport,
174                )
175                if transport.send_once:
176                    break

LifecycleRule(id, name, content_type, object_id, interval, grace_period, min_reviewers, min_reviewers_is_per_group)

def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

content_type

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def object_id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

object

Provide a generic many-to-one relation through the content_type and object_id fields.

This class also doubles as an accessor to the related object (similar to ForwardManyToOneDescriptor) by adding itself as a model attribute.

def interval(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def grace_period(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

reviewer_groups

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

def min_reviewers(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def min_reviewers_is_per_group(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

reviewers

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

notification_transports

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

serializer: type[rest_framework.serializers.BaseSerializer]
69    @property
70    def serializer(self) -> type[BaseSerializer]:
71        from authentik.enterprise.lifecycle.api.rules import LifecycleRuleSerializer
72
73        return LifecycleRuleSerializer

Get serializer for this model

def get_objects(self) -> django.db.models.query.QuerySet:
82    def get_objects(self) -> QuerySet:
83        qs = self.content_type.get_all_objects_for_this_type()
84        if self.object_id:
85            qs = qs.filter(pk=self.object_id)
86        else:
87            qs = qs.exclude(
88                pk__in=LifecycleRule.objects.filter(
89                    content_type=self.content_type, object_id__isnull=False
90                ).values_list(Cast("object_id", output_field=self._get_pk_field()), flat=True)
91            )
92        return qs
def apply(self):
120    def apply(self):
121        self._get_stale_iterations().update(state=ReviewState.CANCELED)
122
123        for iteration in self._get_newly_overdue_iterations():
124            iteration.make_overdue()
125
126        for obj in self._get_newly_due_objects():
127            LifecycleIteration.start(content_type=self.content_type, object_id=obj.pk, rule=self)
def is_satisfied_for_iteration( self, iteration: LifecycleIteration) -> bool:
129    def is_satisfied_for_iteration(self, iteration: LifecycleIteration) -> bool:
130        reviewers = self.reviewers.all()
131        if (
132            iteration.review_set.filter(reviewer__in=reviewers).distinct("reviewer").count()
133            < reviewers.count()
134        ):
135            return False
136        if self.reviewer_groups.count() == 0:
137            return True
138        if self.min_reviewers_is_per_group:
139            for g in self.reviewer_groups.all():
140                if (
141                    iteration.review_set.filter(
142                        reviewer__groups__in=Group.objects.filter(pk=g.pk).with_descendants()
143                    )
144                    .distinct()
145                    .count()
146                    < self.min_reviewers
147                ):
148                    return False
149            return True
150        else:
151            return (
152                iteration.review_set.filter(
153                    reviewer__groups__in=self.reviewer_groups.all().with_descendants()
154                )
155                .distinct()
156                .count()
157                >= self.min_reviewers
158            )
def get_reviewers(self) -> django.db.models.query.QuerySet:
160    def get_reviewers(self) -> QuerySet[User]:
161        return User.objects.filter(
162            Q(id__in=self.reviewers.all().values_list("pk", flat=True))
163            | Q(groups__in=self.reviewer_groups.all().with_descendants())
164        ).distinct()
def notify_reviewers(self, event: authentik.events.models.Event, severity: str):
166    def notify_reviewers(self, event: Event, severity: str):
167        from authentik.enterprise.lifecycle.tasks import send_notification
168
169        for transport in self.notification_transports.all():
170            for user in self.get_reviewers():
171                send_notification.send_with_options(
172                    args=(transport.pk, event.pk, user.pk, severity),
173                    rel_obj=transport,
174                )
175                if transport.send_once:
176                    break
content_type_id
def objects(unknown):

The type of the None singleton.

lifecycleiteration_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class LifecycleRule.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class LifecycleRule.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class ReviewState(django.db.models.enums.TextChoices):
179class ReviewState(models.TextChoices):
180    REVIEWED = "REVIEWED", _("Reviewed")
181    PENDING = "PENDING", _("Pending")
182    OVERDUE = "OVERDUE", _("Overdue")
183    CANCELED = "CANCELED", _("Canceled")

Class for creating enumerated string choices.

186class LifecycleIteration(SerializerModel, ManagedModel):
187    id = models.UUIDField(primary_key=True, default=uuid4)
188    content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
189    object_id = models.TextField(null=False)
190    object = GenericForeignKey("content_type", "object_id")
191
192    rule = models.ForeignKey(LifecycleRule, null=True, on_delete=models.SET_NULL)
193
194    state = models.CharField(max_length=10, choices=ReviewState, default=ReviewState.PENDING)
195    opened_on = models.DateTimeField(auto_now_add=True)
196
197    class Meta:
198        indexes = [models.Index(fields=["content_type", "opened_on"])]
199
200    @property
201    def serializer(self) -> type[BaseSerializer]:
202        from authentik.enterprise.lifecycle.api.iterations import LifecycleIterationSerializer
203
204        return LifecycleIterationSerializer
205
206    def _get_model_name(self) -> str:
207        return self.content_type.name.lower()
208
209    def _get_event_args(self) -> dict:
210        return {
211            "target": self.object,
212            "hyperlink": link_for_model(self.object),
213            "hyperlink_label": _(f"Go to {self._get_model_name()}"),
214            "lifecycle_iteration": self.id,
215        }
216
217    def initialize(self):
218        event = Event.new(
219            EventAction.REVIEW_INITIATED,
220            message=_(f"Access review is due for {self.content_type.name} {str(self.object)}"),
221            **self._get_event_args(),
222        )
223        event.save()
224        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
225
226    def make_overdue(self):
227        self.state = ReviewState.OVERDUE
228
229        event = Event.new(
230            EventAction.REVIEW_OVERDUE,
231            message=_(f"Access review is overdue for {self.content_type.name} {str(self.object)}"),
232            **self._get_event_args(),
233        )
234        event.save()
235        self.rule.notify_reviewers(event, NotificationSeverity.ALERT)
236        self.save()
237
238    @staticmethod
239    def start(content_type: ContentType, object_id: str, rule: LifecycleRule) -> LifecycleIteration:
240        iteration = LifecycleIteration.objects.create(
241            content_type=content_type, object_id=object_id, rule=rule
242        )
243        iteration.initialize()
244        return iteration
245
246    def make_reviewed(self, request: HttpRequest):
247        self.state = ReviewState.REVIEWED
248        event = Event.new(
249            EventAction.REVIEW_COMPLETED,
250            message=_(f"Access review completed for {self.content_type.name} {str(self.object)}"),
251            **self._get_event_args(),
252        ).from_http(request)
253        event.save()
254        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
255        self.save()
256
257    def on_review(self, request: HttpRequest):
258        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
259            raise AssertionError("Review is not pending or overdue")
260        if self.rule.is_satisfied_for_iteration(self):
261            self.make_reviewed(request)
262
263    def user_can_review(self, user: User) -> bool:
264        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
265            return False
266        if self.review_set.filter(reviewer=user).exists():
267            return False
268        groups = self.rule.reviewer_groups.all()
269        if groups:
270            for group in groups:
271                if group.is_member(user):
272                    return True
273            return False
274        else:
275            return user in self.rule.get_reviewers()

LifecycleIteration(managed, id, content_type, object_id, rule, state, opened_on)

def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

content_type

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def object_id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

object

Provide a generic many-to-one relation through the content_type and object_id fields.

This class also doubles as an accessor to the related object (similar to ForwardManyToOneDescriptor) by adding itself as a model attribute.

rule

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def state(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def opened_on(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
200    @property
201    def serializer(self) -> type[BaseSerializer]:
202        from authentik.enterprise.lifecycle.api.iterations import LifecycleIterationSerializer
203
204        return LifecycleIterationSerializer

Get serializer for this model

def initialize(self):
217    def initialize(self):
218        event = Event.new(
219            EventAction.REVIEW_INITIATED,
220            message=_(f"Access review is due for {self.content_type.name} {str(self.object)}"),
221            **self._get_event_args(),
222        )
223        event.save()
224        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
def make_overdue(self):
226    def make_overdue(self):
227        self.state = ReviewState.OVERDUE
228
229        event = Event.new(
230            EventAction.REVIEW_OVERDUE,
231            message=_(f"Access review is overdue for {self.content_type.name} {str(self.object)}"),
232            **self._get_event_args(),
233        )
234        event.save()
235        self.rule.notify_reviewers(event, NotificationSeverity.ALERT)
236        self.save()
@staticmethod
def start( content_type: django.contrib.contenttypes.models.ContentType, object_id: str, rule: LifecycleRule) -> LifecycleIteration:
238    @staticmethod
239    def start(content_type: ContentType, object_id: str, rule: LifecycleRule) -> LifecycleIteration:
240        iteration = LifecycleIteration.objects.create(
241            content_type=content_type, object_id=object_id, rule=rule
242        )
243        iteration.initialize()
244        return iteration
def make_reviewed(self, request: django.http.request.HttpRequest):
246    def make_reviewed(self, request: HttpRequest):
247        self.state = ReviewState.REVIEWED
248        event = Event.new(
249            EventAction.REVIEW_COMPLETED,
250            message=_(f"Access review completed for {self.content_type.name} {str(self.object)}"),
251            **self._get_event_args(),
252        ).from_http(request)
253        event.save()
254        self.rule.notify_reviewers(event, NotificationSeverity.NOTICE)
255        self.save()
def on_review(self, request: django.http.request.HttpRequest):
257    def on_review(self, request: HttpRequest):
258        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
259            raise AssertionError("Review is not pending or overdue")
260        if self.rule.is_satisfied_for_iteration(self):
261            self.make_reviewed(request)
def user_can_review(self, user: authentik.core.models.User) -> bool:
263    def user_can_review(self, user: User) -> bool:
264        if self.state not in (ReviewState.PENDING, ReviewState.OVERDUE):
265            return False
266        if self.review_set.filter(reviewer=user).exists():
267            return False
268        groups = self.rule.reviewer_groups.all()
269        if groups:
270            for group in groups:
271                if group.is_member(user):
272                    return True
273            return False
274        else:
275            return user in self.rule.get_reviewers()
def managed(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

content_type_id
rule_id
def get_state_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_opened_on(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_opened_on(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def objects(unknown):

The type of the None singleton.

review_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class LifecycleIteration.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class LifecycleIteration.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class Review(authentik.lib.models.SerializerModel):
278class Review(SerializerModel):
279    id = models.UUIDField(primary_key=True, default=uuid4)
280    iteration = models.ForeignKey(LifecycleIteration, on_delete=models.CASCADE)
281
282    reviewer = models.ForeignKey("authentik_core.User", on_delete=models.CASCADE)
283    timestamp = models.DateTimeField(auto_now_add=True)
284    note = models.TextField(null=True)
285
286    class Meta:
287        unique_together = [["iteration", "reviewer"]]
288
289    @property
290    def serializer(self) -> type[BaseSerializer]:
291        from authentik.enterprise.lifecycle.api.reviews import ReviewSerializer
292
293        return ReviewSerializer

Review(id, iteration, reviewer, timestamp, note)

def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

iteration

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

reviewer

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def timestamp(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def note(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
289    @property
290    def serializer(self) -> type[BaseSerializer]:
291        from authentik.enterprise.lifecycle.api.reviews import ReviewSerializer
292
293        return ReviewSerializer

Get serializer for this model

iteration_id
reviewer_id
def get_next_by_timestamp(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_timestamp(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def objects(unknown):

The type of the None singleton.

class Review.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class Review.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.