authentik.enterprise.lifecycle.tasks

 1from django.utils.translation import gettext_lazy as _
 2from dramatiq import actor
 3
 4from authentik.core.models import User
 5from authentik.enterprise.lifecycle.models import LifecycleRule
 6from authentik.events.models import Event, Notification, NotificationTransport
 7
 8
 9@actor(description=_("Dispatch tasks to validate lifecycle rules."))
10def apply_lifecycle_rules():
11    for rule in LifecycleRule.objects.all():
12        apply_lifecycle_rule.send_with_options(
13            args=(rule.id,),
14            rel_obj=rule,
15        )
16
17
18@actor(description=_("Apply lifecycle rule."))
19def apply_lifecycle_rule(rule_id: str):
20    rule = LifecycleRule.objects.filter(pk=rule_id).first()
21    if rule:
22        rule.apply()
23
24
25@actor(description=_("Send lifecycle rule notification."))
26def send_notification(transport_pk: int, event_pk: str, user_pk: int, severity: str):
27    event = Event.objects.filter(pk=event_pk).first()
28    if not event:
29        return
30    user = User.objects.filter(pk=user_pk).first()
31    if not user:
32        return
33
34    notification = Notification(
35        severity=severity,
36        body=event.summary,
37        event=event,
38        user=user,
39        hyperlink=event.hyperlink,
40        hyperlink_label=event.hyperlink_label,
41    )
42    transport = NotificationTransport.objects.filter(pk=transport_pk).first()
43    if not transport:
44        return
45    transport.send(notification)