authentik.events.tasks
Event notification tasks
1"""Event notification tasks""" 2 3from uuid import UUID 4 5from django.db.models.query_utils import Q 6from django.utils.translation import gettext_lazy as _ 7from dramatiq.actor import actor 8from guardian.shortcuts import get_anonymous_user 9from structlog.stdlib import get_logger 10 11from authentik.core.models import User 12from authentik.events.models import ( 13 Event, 14 Notification, 15 NotificationRule, 16 NotificationTransport, 17) 18from authentik.lib.utils.db import chunked_queryset 19from authentik.policies.engine import PolicyEngine 20from authentik.policies.models import PolicyBinding, PolicyEngineMode 21from authentik.tasks.middleware import CurrentTask 22 23LOGGER = get_logger() 24 25 26@actor(description=_("Dispatch new event notifications.")) 27def event_trigger_dispatch(event_uuid: UUID): 28 for trigger in NotificationRule.objects.all(): 29 event_trigger_handler.send_with_options(args=(event_uuid, trigger.name), rel_obj=trigger) 30 31 32@actor( 33 description=_( 34 "Check if policies attached to NotificationRule match event " 35 "and dispatch notification tasks." 36 ) 37) 38def event_trigger_handler(event_uuid: UUID, trigger_name: str): 39 """Check if policies attached to NotificationRule match event""" 40 self = CurrentTask.get_task() 41 42 event: Event = Event.objects.filter(event_uuid=event_uuid).first() 43 if not event: 44 self.warning("event doesn't exist yet or anymore", event_uuid=event_uuid) 45 return 46 47 trigger: NotificationRule | None = NotificationRule.objects.filter(name=trigger_name).first() 48 if not trigger: 49 return 50 51 if "policy_uuid" in event.context: 52 policy_uuid = event.context["policy_uuid"] 53 if PolicyBinding.objects.filter( 54 target__in=NotificationRule.objects.all().values_list("pbm_uuid", flat=True), 55 policy=policy_uuid, 56 ).exists(): 57 # If policy that caused this event to be created is attached 58 # to *any* NotificationRule, we return early. 59 # This is the most effective way to prevent infinite loops. 60 LOGGER.debug("e(trigger): attempting to prevent infinite loop", trigger=trigger) 61 return 62 63 LOGGER.debug("e(trigger): checking if trigger applies", trigger=trigger) 64 try: 65 user = User.objects.filter(pk=event.user.get("pk")).first() or get_anonymous_user() 66 except User.DoesNotExist: 67 LOGGER.warning("e(trigger): failed to get user", trigger=trigger) 68 return 69 policy_engine = PolicyEngine(trigger, user) 70 policy_engine.mode = PolicyEngineMode.MODE_ANY 71 policy_engine.empty_result = False 72 policy_engine.use_cache = False 73 policy_engine.request.obj = event 74 policy_engine.request.context["event"] = event 75 policy_engine.build() 76 result = policy_engine.result 77 if not result.passing: 78 return 79 80 LOGGER.debug("e(trigger): event trigger matched", trigger=trigger) 81 # Create the notification objects 82 count = 0 83 for transport in trigger.transports.all(): 84 for user in trigger.destination_users(event): 85 notification_transport.send_with_options( 86 args=( 87 transport.pk, 88 event.pk, 89 user.pk, 90 trigger.pk, 91 ), 92 rel_obj=transport, 93 ) 94 count += 1 95 if transport.send_once: 96 break 97 self.info(f"Created {count} notification tasks") 98 99 100@actor(description=_("Send notification.")) 101def notification_transport(transport_pk: int, event_pk: str, user_pk: int, trigger_pk: str): 102 """Send notification over specified transport""" 103 event = Event.objects.filter(pk=event_pk).first() 104 if not event: 105 return 106 user = User.objects.filter(pk=user_pk).first() 107 if not user: 108 return 109 trigger = NotificationRule.objects.filter(pk=trigger_pk).first() 110 if not trigger: 111 return 112 notification = Notification( 113 severity=trigger.severity, 114 body=event.summary, 115 event=event, 116 user=user, 117 hyperlink=event.hyperlink, 118 hyperlink_label=event.hyperlink_label, 119 ) 120 transport: NotificationTransport = NotificationTransport.objects.filter(pk=transport_pk).first() 121 if not transport: 122 return 123 transport.send(notification) 124 125 126@actor(description=_("Cleanup events for GDPR compliance.")) 127def gdpr_cleanup(user_pk: int): 128 """cleanup events from gdpr_compliance""" 129 events = Event.objects.filter(user__pk=user_pk) 130 LOGGER.debug("GDPR cleanup, removing events from user", events=events.count()) 131 for event in chunked_queryset(events): 132 event.delete() 133 134 135@actor(description=_("Cleanup seen notifications and notifications whose event expired.")) 136def notification_cleanup(): 137 """Cleanup seen notifications and notifications whose event expired.""" 138 self = CurrentTask.get_task() 139 notifications = Notification.objects.filter(Q(event=None) | Q(seen=True)) 140 amount = notifications.count() 141 notifications.delete() 142 LOGGER.debug("Expired notifications", amount=amount) 143 self.info(f"Expired {amount} Notifications")
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
event_trigger_handler =
Actor(<function event_trigger_handler>, queue_name='default', actor_name='event_trigger_handler')
Check if policies attached to NotificationRule match event
notification_transport =
Actor(<function notification_transport>, queue_name='default', actor_name='notification_transport')
Send notification over specified transport
gdpr_cleanup =
Actor(<function gdpr_cleanup>, queue_name='default', actor_name='gdpr_cleanup')
cleanup events from gdpr_compliance
notification_cleanup =
Actor(<function notification_cleanup>, queue_name='default', actor_name='notification_cleanup')
Cleanup seen notifications and notifications whose event expired.