authentik.events.tasks

Event notification tasks

  1"""Event notification tasks"""
  2
  3from uuid import UUID
  4
  5from django.db.models.query_utils import Q
  6from django.utils.translation import gettext_lazy as _
  7from dramatiq.actor import actor
  8from guardian.shortcuts import get_anonymous_user
  9from structlog.stdlib import get_logger
 10
 11from authentik.core.models import User
 12from authentik.events.models import (
 13    Event,
 14    Notification,
 15    NotificationRule,
 16    NotificationTransport,
 17)
 18from authentik.lib.utils.db import chunked_queryset
 19from authentik.policies.engine import PolicyEngine
 20from authentik.policies.models import PolicyBinding, PolicyEngineMode
 21from authentik.tasks.middleware import CurrentTask
 22
 23LOGGER = get_logger()
 24
 25
 26@actor(description=_("Dispatch new event notifications."))
 27def event_trigger_dispatch(event_uuid: UUID):
 28    for trigger in NotificationRule.objects.all():
 29        event_trigger_handler.send_with_options(args=(event_uuid, trigger.name), rel_obj=trigger)
 30
 31
 32@actor(
 33    description=_(
 34        "Check if policies attached to NotificationRule match event "
 35        "and dispatch notification tasks."
 36    )
 37)
 38def event_trigger_handler(event_uuid: UUID, trigger_name: str):
 39    """Check if policies attached to NotificationRule match event"""
 40    self = CurrentTask.get_task()
 41
 42    event: Event = Event.objects.filter(event_uuid=event_uuid).first()
 43    if not event:
 44        self.warning("event doesn't exist yet or anymore", event_uuid=event_uuid)
 45        return
 46
 47    trigger: NotificationRule | None = NotificationRule.objects.filter(name=trigger_name).first()
 48    if not trigger:
 49        return
 50
 51    if "policy_uuid" in event.context:
 52        policy_uuid = event.context["policy_uuid"]
 53        if PolicyBinding.objects.filter(
 54            target__in=NotificationRule.objects.all().values_list("pbm_uuid", flat=True),
 55            policy=policy_uuid,
 56        ).exists():
 57            # If policy that caused this event to be created is attached
 58            # to *any* NotificationRule, we return early.
 59            # This is the most effective way to prevent infinite loops.
 60            LOGGER.debug("e(trigger): attempting to prevent infinite loop", trigger=trigger)
 61            return
 62
 63    LOGGER.debug("e(trigger): checking if trigger applies", trigger=trigger)
 64    try:
 65        user = User.objects.filter(pk=event.user.get("pk")).first() or get_anonymous_user()
 66    except User.DoesNotExist:
 67        LOGGER.warning("e(trigger): failed to get user", trigger=trigger)
 68        return
 69    policy_engine = PolicyEngine(trigger, user)
 70    policy_engine.mode = PolicyEngineMode.MODE_ANY
 71    policy_engine.empty_result = False
 72    policy_engine.use_cache = False
 73    policy_engine.request.obj = event
 74    policy_engine.request.context["event"] = event
 75    policy_engine.build()
 76    result = policy_engine.result
 77    if not result.passing:
 78        return
 79
 80    LOGGER.debug("e(trigger): event trigger matched", trigger=trigger)
 81    # Create the notification objects
 82    count = 0
 83    for transport in trigger.transports.all():
 84        for user in trigger.destination_users(event):
 85            notification_transport.send_with_options(
 86                args=(
 87                    transport.pk,
 88                    event.pk,
 89                    user.pk,
 90                    trigger.pk,
 91                ),
 92                rel_obj=transport,
 93            )
 94            count += 1
 95            if transport.send_once:
 96                break
 97    self.info(f"Created {count} notification tasks")
 98
 99
100@actor(description=_("Send notification."))
101def notification_transport(transport_pk: int, event_pk: str, user_pk: int, trigger_pk: str):
102    """Send notification over specified transport"""
103    event = Event.objects.filter(pk=event_pk).first()
104    if not event:
105        return
106    user = User.objects.filter(pk=user_pk).first()
107    if not user:
108        return
109    trigger = NotificationRule.objects.filter(pk=trigger_pk).first()
110    if not trigger:
111        return
112    notification = Notification(
113        severity=trigger.severity,
114        body=event.summary,
115        event=event,
116        user=user,
117        hyperlink=event.hyperlink,
118        hyperlink_label=event.hyperlink_label,
119    )
120    transport: NotificationTransport = NotificationTransport.objects.filter(pk=transport_pk).first()
121    if not transport:
122        return
123    transport.send(notification)
124
125
126@actor(description=_("Cleanup events for GDPR compliance."))
127def gdpr_cleanup(user_pk: int):
128    """cleanup events from gdpr_compliance"""
129    events = Event.objects.filter(user__pk=user_pk)
130    LOGGER.debug("GDPR cleanup, removing events from user", events=events.count())
131    for event in chunked_queryset(events):
132        event.delete()
133
134
135@actor(description=_("Cleanup seen notifications and notifications whose event expired."))
136def notification_cleanup():
137    """Cleanup seen notifications and notifications whose event expired."""
138    self = CurrentTask.get_task()
139    notifications = Notification.objects.filter(Q(event=None) | Q(seen=True))
140    amount = notifications.count()
141    notifications.delete()
142    LOGGER.debug("Expired notifications", amount=amount)
143    self.info(f"Expired {amount} Notifications")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
event_trigger_handler = Actor(<function event_trigger_handler>, queue_name='default', actor_name='event_trigger_handler')

Check if policies attached to NotificationRule match event

notification_transport = Actor(<function notification_transport>, queue_name='default', actor_name='notification_transport')

Send notification over specified transport

gdpr_cleanup = Actor(<function gdpr_cleanup>, queue_name='default', actor_name='gdpr_cleanup')

cleanup events from gdpr_compliance

notification_cleanup = Actor(<function notification_cleanup>, queue_name='default', actor_name='notification_cleanup')

Cleanup seen notifications and notifications whose event expired.