1from django.utils.translation import gettext_lazy as _
2from dramatiq import actor
3
4from authentik.core.models import User
5from authentik.enterprise.lifecycle.models import LifecycleRule
6from authentik.events.models import Event, Notification, NotificationTransport
7from authentik.tasks.schedules.models import Schedule
8
9
10@actor(description=_("Dispatch tasks to apply lifecycle rules."))
11def apply_lifecycle_rules():
12 for rule in LifecycleRule.objects.all():
13 apply_lifecycle_rule.send_with_options(
14 args=(rule.id,),
15 rel_obj=Schedule.objects.get(
16 actor_name="authentik.enterprise.lifecycle.tasks.apply_lifecycle_rules"
17 ),
18 )
19
20
21@actor(description=_("Apply lifecycle rule."))
22def apply_lifecycle_rule(rule_id: str):
23 rule = LifecycleRule.objects.filter(pk=rule_id).first()
24 if rule:
25 rule.apply()
26
27
28@actor(description=_("Send lifecycle rule notification."))
29def send_notification(transport_pk: int, event_pk: str, user_pk: int, severity: str):
30 event = Event.objects.filter(pk=event_pk).first()
31 if not event:
32 return
33 user = User.objects.filter(pk=user_pk).first()
34 if not user:
35 return
36
37 notification = Notification(
38 severity=severity,
39 body=event.summary,
40 event=event,
41 user=user,
42 hyperlink=event.hyperlink,
43 hyperlink_label=event.hyperlink_label,
44 )
45 transport = NotificationTransport.objects.filter(pk=transport_pk).first()
46 if not transport:
47 return
48 transport.send(notification)