authentik.enterprise.lifecycle.tasks

 1from django.utils.translation import gettext_lazy as _
 2from dramatiq import actor
 3
 4from authentik.core.models import User
 5from authentik.enterprise.lifecycle.models import LifecycleRule
 6from authentik.events.models import Event, Notification, NotificationTransport
 7from authentik.tasks.schedules.models import Schedule
 8
 9
10@actor(description=_("Dispatch tasks to apply lifecycle rules."))
11def apply_lifecycle_rules():
12    for rule in LifecycleRule.objects.all():
13        apply_lifecycle_rule.send_with_options(
14            args=(rule.id,),
15            rel_obj=Schedule.objects.get(
16                actor_name="authentik.enterprise.lifecycle.tasks.apply_lifecycle_rules"
17            ),
18        )
19
20
21@actor(description=_("Apply lifecycle rule."))
22def apply_lifecycle_rule(rule_id: str):
23    rule = LifecycleRule.objects.filter(pk=rule_id).first()
24    if rule:
25        rule.apply()
26
27
28@actor(description=_("Send lifecycle rule notification."))
29def send_notification(transport_pk: int, event_pk: str, user_pk: int, severity: str):
30    event = Event.objects.filter(pk=event_pk).first()
31    if not event:
32        return
33    user = User.objects.filter(pk=user_pk).first()
34    if not user:
35        return
36
37    notification = Notification(
38        severity=severity,
39        body=event.summary,
40        event=event,
41        user=user,
42        hyperlink=event.hyperlink,
43        hyperlink_label=event.hyperlink_label,
44    )
45    transport = NotificationTransport.objects.filter(pk=transport_pk).first()
46    if not transport:
47        return
48    transport.send(notification)