1from django.utils.translation import gettext_lazy as _
2from dramatiq import actor
3
4from authentik.core.models import User
5from authentik.enterprise.lifecycle.models import LifecycleRule
6from authentik.events.models import Event, Notification, NotificationTransport
7
8
9@actor(description=_("Dispatch tasks to validate lifecycle rules."))
10def apply_lifecycle_rules():
11 for rule in LifecycleRule.objects.all():
12 apply_lifecycle_rule.send_with_options(
13 args=(rule.id,),
14 rel_obj=rule,
15 )
16
17
18@actor(description=_("Apply lifecycle rule."))
19def apply_lifecycle_rule(rule_id: str):
20 rule = LifecycleRule.objects.filter(pk=rule_id).first()
21 if rule:
22 rule.apply()
23
24
25@actor(description=_("Send lifecycle rule notification."))
26def send_notification(transport_pk: int, event_pk: str, user_pk: int, severity: str):
27 event = Event.objects.filter(pk=event_pk).first()
28 if not event:
29 return
30 user = User.objects.filter(pk=user_pk).first()
31 if not user:
32 return
33
34 notification = Notification(
35 severity=severity,
36 body=event.summary,
37 event=event,
38 user=user,
39 hyperlink=event.hyperlink,
40 hyperlink_label=event.hyperlink_label,
41 )
42 transport = NotificationTransport.objects.filter(pk=transport_pk).first()
43 if not transport:
44 return
45 transport.send(notification)