authentik.events.models

authentik events models

  1"""authentik events models"""
  2
  3from collections.abc import Generator
  4from datetime import timedelta
  5from difflib import get_close_matches
  6from functools import lru_cache
  7from inspect import currentframe
  8from typing import Any
  9from uuid import uuid4
 10
 11from asgiref.sync import async_to_sync
 12from channels.layers import get_channel_layer
 13from django.apps import apps
 14from django.db import models
 15from django.db.models import Q
 16from django.http import HttpRequest
 17from django.http.request import QueryDict
 18from django.utils.timezone import now
 19from django.utils.translation import gettext as _
 20from requests import RequestException
 21from rest_framework.serializers import Serializer
 22from structlog.stdlib import get_logger
 23
 24from authentik import authentik_full_version
 25from authentik.brands.models import Brand
 26from authentik.brands.utils import DEFAULT_BRAND
 27from authentik.core.middleware import (
 28    SESSION_KEY_IMPERSONATE_ORIGINAL_USER,
 29    SESSION_KEY_IMPERSONATE_USER,
 30)
 31from authentik.core.models import ExpiringModel, Group, PropertyMapping, User
 32from authentik.crypto.models import CertificateKeyPair
 33from authentik.events.context_processors.base import get_context_processors
 34from authentik.events.utils import (
 35    cleanse_dict,
 36    get_user,
 37    model_to_dict,
 38    sanitize_dict,
 39    sanitize_item,
 40)
 41from authentik.lib.models import DomainlessURLValidator, SerializerModel
 42from authentik.lib.sentry import SentryIgnoredException
 43from authentik.lib.utils.errors import exception_to_dict
 44from authentik.lib.utils.http import get_http_session
 45from authentik.lib.utils.time import timedelta_from_string
 46from authentik.outposts.docker_tls import DockerInlineTLS
 47from authentik.policies.models import PolicyBindingModel
 48from authentik.root.middleware import ClientIPMiddleware
 49from authentik.root.ws.consumer import build_user_group
 50from authentik.stages.email.models import EmailTemplates
 51from authentik.stages.email.utils import TemplateEmailMessage
 52from authentik.tasks.models import TasksModel
 53from authentik.tenants.models import Tenant
 54from authentik.tenants.utils import get_current_tenant
 55
 56LOGGER = get_logger()
 57DISCORD_FIELD_LIMIT = 25
 58NOTIFICATION_SUMMARY_LENGTH = 75
 59
 60
 61def default_event_duration():
 62    """Default duration an Event is saved.
 63    This is used as a fallback when no brand is available"""
 64    try:
 65        tenant = get_current_tenant(only=["event_retention"])
 66        return now() + timedelta_from_string(tenant.event_retention)
 67    except Tenant.DoesNotExist:
 68        return now() + timedelta(days=365)
 69
 70
 71def default_brand():
 72    """Get a default value for brand"""
 73    return sanitize_dict(model_to_dict(DEFAULT_BRAND))
 74
 75
 76@lru_cache
 77def django_app_names() -> list[str]:
 78    """Get a cached list of all django apps' names (not labels)"""
 79    return [x.name for x in apps.app_configs.values()]
 80
 81
 82class NotificationTransportError(SentryIgnoredException):
 83    """Error raised when a notification fails to be delivered"""
 84
 85
 86class EventAction(models.TextChoices):
 87    """All possible actions to save into the events log"""
 88
 89    LOGIN = "login"
 90    LOGIN_FAILED = "login_failed"
 91    LOGOUT = "logout"
 92
 93    USER_WRITE = "user_write"
 94    SUSPICIOUS_REQUEST = "suspicious_request"
 95    PASSWORD_SET = "password_set"  # noqa # nosec
 96
 97    SECRET_VIEW = "secret_view"  # noqa # nosec
 98    SECRET_ROTATE = "secret_rotate"  # noqa # nosec
 99
100    INVITE_USED = "invitation_used"
101
102    AUTHORIZE_APPLICATION = "authorize_application"
103    SOURCE_LINKED = "source_linked"
104
105    IMPERSONATION_STARTED = "impersonation_started"
106    IMPERSONATION_ENDED = "impersonation_ended"
107
108    FLOW_EXECUTION = "flow_execution"
109    POLICY_EXECUTION = "policy_execution"
110    POLICY_EXCEPTION = "policy_exception"
111    PROPERTY_MAPPING_EXCEPTION = "property_mapping_exception"
112
113    SYSTEM_TASK_EXECUTION = "system_task_execution"
114    SYSTEM_TASK_EXCEPTION = "system_task_exception"
115    SYSTEM_EXCEPTION = "system_exception"
116
117    CONFIGURATION_ERROR = "configuration_error"
118    CONFIGURATION_WARNING = "configuration_warning"
119
120    MODEL_CREATED = "model_created"
121    MODEL_UPDATED = "model_updated"
122    MODEL_DELETED = "model_deleted"
123    EMAIL_SENT = "email_sent"
124
125    UPDATE_AVAILABLE = "update_available"
126
127    EXPORT_READY = "export_ready"
128
129    REVIEW_INITIATED = "review_initiated"
130    REVIEW_OVERDUE = "review_overdue"
131    REVIEW_ATTESTED = "review_attested"
132    REVIEW_COMPLETED = "review_completed"
133
134    CUSTOM_PREFIX = "custom_"
135
136
137class Event(SerializerModel, ExpiringModel):
138    """An individual Audit/Metrics/Notification/Error Event"""
139
140    event_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
141    user = models.JSONField(default=dict)
142    action = models.TextField(choices=EventAction.choices)
143    app = models.TextField()
144    context = models.JSONField(default=dict, blank=True)
145    client_ip = models.GenericIPAddressField(null=True)
146    created = models.DateTimeField(auto_now_add=True)
147    brand = models.JSONField(default=default_brand, blank=True)
148
149    # Shadow the expires attribute from ExpiringModel to override the default duration
150    expires = models.DateTimeField(default=default_event_duration)
151
152    @staticmethod
153    def _get_app_from_request(request: HttpRequest) -> str:
154        if not isinstance(request, HttpRequest):
155            return ""
156        return request.resolver_match.app_name
157
158    @staticmethod
159    def new(
160        action: str | EventAction,
161        app: str | None = None,
162        **kwargs,
163    ) -> Event:
164        """Create new Event instance from arguments. Instance is NOT saved."""
165        if not isinstance(action, EventAction):
166            action = EventAction.CUSTOM_PREFIX + action
167        if not app:
168            current = currentframe()
169            parent = current.f_back
170            app = parent.f_globals["__name__"]
171            # Attempt to match the calling module to the django app it belongs to
172            # if we can't find a match, keep the module name
173            django_apps: list[str] = get_close_matches(app, django_app_names(), n=1)
174            # Also ensure that closest django app has the correct prefix
175            if len(django_apps) > 0 and django_apps[0].startswith(app):
176                app = django_apps[0]
177        cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
178        event = Event(action=action, app=app, context=cleaned_kwargs)
179        return event
180
181    def with_exception(self, exc: Exception) -> Event:
182        """Add data from 'exc' to the event in a database-saveable format"""
183        self.context.setdefault("message", str(exc))
184        self.context["exception"] = exception_to_dict(exc)
185        return self
186
187    def set_user(self, user: User) -> Event:
188        """Set `.user` based on user, ensuring the correct attributes are copied.
189        This should only be used when self.from_http is *not* used."""
190        self.user = get_user(user)
191        return self
192
193    def from_http(self, request: HttpRequest, user: User | None = None) -> Event:
194        """Add data from a Django-HttpRequest, allowing the creation of
195        Events independently from requests.
196        `user` arguments optionally overrides user from requests."""
197        if request:
198            from authentik.flows.views.executor import QS_QUERY
199
200            self.context["http_request"] = {
201                "path": request.path,
202                "method": request.method,
203                "args": cleanse_dict(QueryDict(request.META.get("QUERY_STRING", ""))),
204                "user_agent": request.META.get("HTTP_USER_AGENT", ""),
205            }
206            if hasattr(request, "request_id"):
207                self.context["http_request"]["request_id"] = request.request_id
208            # Special case for events created during flow execution
209            # since they keep the http query within a wrapped query
210            if QS_QUERY in self.context["http_request"]["args"]:
211                wrapped = self.context["http_request"]["args"][QS_QUERY]
212                self.context["http_request"]["args"] = cleanse_dict(QueryDict(wrapped))
213        if hasattr(request, "brand"):
214            brand: Brand = request.brand
215            self.brand = sanitize_dict(model_to_dict(brand))
216        if hasattr(request, "user"):
217            self.user = get_user(request.user)
218        if user:
219            self.user = get_user(user)
220        if hasattr(request, "session"):
221            from authentik.flows.views.executor import SESSION_KEY_PLAN
222
223            # Check if we're currently impersonating, and add that user
224            if SESSION_KEY_IMPERSONATE_ORIGINAL_USER in request.session:
225                self.user = get_user(request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER])
226                self.user["on_behalf_of"] = get_user(request.session[SESSION_KEY_IMPERSONATE_USER])
227            # Special case for events that happen during a flow, the user might not be authenticated
228            # yet but is a pending user instead
229            if SESSION_KEY_PLAN in request.session:
230                from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
231
232                plan: FlowPlan = request.session[SESSION_KEY_PLAN]
233                pending_user = plan.context.get(PLAN_CONTEXT_PENDING_USER, None)
234                # Only save `authenticated_as` if there's a different pending user in the flow
235                # than the user that is authenticated
236                if pending_user and (
237                    (pending_user.pk and pending_user.pk != self.user.get("pk"))
238                    or (not pending_user.pk)
239                ):
240                    orig_user = self.user.copy()
241
242                    self.user = {"authenticated_as": orig_user, **get_user(pending_user)}
243        # User 255.255.255.255 as fallback if IP cannot be determined
244        self.client_ip = ClientIPMiddleware.get_client_ip(request)
245        # Enrich event data
246        for processor in get_context_processors():
247            processor.enrich_event(self)
248        # If there's no app set, we get it from the requests too
249        if not self.app:
250            self.app = Event._get_app_from_request(request)
251        self.save()
252        return self
253
254    @staticmethod
255    def log_deprecation(
256        identifier: str, message: str, cause: str | None = None, expiry_days=30, **kwargs
257    ):
258        query = Q(
259            action=EventAction.CONFIGURATION_WARNING,
260            context__deprecation=identifier,
261        )
262        if cause:
263            query &= Q(context__cause=cause)
264        if Event.objects.filter(query).exists():
265            return
266        event = Event.new(
267            EventAction.CONFIGURATION_WARNING,
268            deprecation=identifier,
269            message=message,
270            cause=cause,
271            **kwargs,
272        )
273        event.expires = now() + timedelta(days=expiry_days)
274        event.save()
275
276    def save(self, *args, **kwargs):
277        if self._state.adding:
278            LOGGER.info(
279                "Created Event",
280                action=self.action,
281                context=self.context,
282                client_ip=self.client_ip,
283                user=self.user,
284            )
285        super().save(*args, **kwargs)
286
287    @property
288    def serializer(self) -> type[Serializer]:
289        from authentik.events.api.events import EventSerializer
290
291        return EventSerializer
292
293    @property
294    def summary(self) -> str:
295        """Return a summary of this event."""
296        if "message" in self.context:
297            return self.context["message"]
298        return f"{self.action}: {self.context}"
299
300    @property
301    def hyperlink(self) -> str | None:
302        return self.context.get("hyperlink")
303
304    @property
305    def hyperlink_label(self) -> str | None:
306        return self.context.get("hyperlink_label")
307
308    def __str__(self) -> str:
309        return f"Event action={self.action} user={self.user} context={self.context}"
310
311    class Meta:
312        verbose_name = _("Event")
313        verbose_name_plural = _("Events")
314        indexes = ExpiringModel.Meta.indexes + [
315            models.Index(fields=["action"]),
316            models.Index(fields=["user"]),
317            models.Index(fields=["app"]),
318            models.Index(fields=["created"]),
319            models.Index(fields=["client_ip"]),
320            models.Index(
321                models.F("context__authorized_application"),
322                name="authentik_e_ctx_app__idx",
323            ),
324        ]
325
326
327class TransportMode(models.TextChoices):
328    """Modes that a notification transport can send a notification"""
329
330    LOCAL = "local", _("authentik inbuilt notifications")
331    WEBHOOK = "webhook", _("Generic Webhook")
332    WEBHOOK_SLACK = "webhook_slack", _("Slack Webhook (Slack/Discord)")
333    EMAIL = "email", _("Email")
334
335
336class NotificationTransport(TasksModel, SerializerModel):
337    """Action which is executed when a Rule matches"""
338
339    uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
340
341    name = models.TextField(unique=True)
342    mode = models.TextField(choices=TransportMode.choices, default=TransportMode.LOCAL)
343    send_once = models.BooleanField(
344        default=False,
345        help_text=_(
346            "Only send notification once, for example when sending a webhook into a chat channel."
347        ),
348    )
349
350    email_subject_prefix = models.TextField(default="authentik Notification: ", blank=True)
351    email_template = models.TextField(default=EmailTemplates.EVENT_NOTIFICATION)
352
353    webhook_url = models.TextField(blank=True, validators=[DomainlessURLValidator()])
354    webhook_ca = models.ForeignKey(
355        CertificateKeyPair,
356        null=True,
357        default=None,
358        on_delete=models.SET_DEFAULT,
359        help_text=_(
360            "When set, the selected ceritifcate is used to "
361            "validate the certificate of the webhook server."
362        ),
363    )
364    webhook_mapping_body = models.ForeignKey(
365        "NotificationWebhookMapping",
366        on_delete=models.SET_DEFAULT,
367        null=True,
368        default=None,
369        related_name="+",
370        help_text=_(
371            "Customize the body of the request. "
372            "Mapping should return data that is JSON-serializable."
373        ),
374    )
375    webhook_mapping_headers = models.ForeignKey(
376        "NotificationWebhookMapping",
377        on_delete=models.SET_DEFAULT,
378        null=True,
379        default=None,
380        related_name="+",
381        help_text=_(
382            "Configure additional headers to be sent. "
383            "Mapping should return a dictionary of key-value pairs"
384        ),
385    )
386
387    def send(self, notification: Notification) -> list[str]:
388        """Send notification to user, called from async task"""
389        if self.mode == TransportMode.LOCAL:
390            return self.send_local(notification)
391        if self.mode == TransportMode.WEBHOOK:
392            return self.send_webhook(notification)
393        if self.mode == TransportMode.WEBHOOK_SLACK:
394            return self.send_webhook_slack(notification)
395        if self.mode == TransportMode.EMAIL:
396            return self.send_email(notification)
397        raise ValueError(f"Invalid mode {self.mode} set")
398
399    def send_local(self, notification: Notification) -> list[str]:
400        """Local notification delivery"""
401        if self.webhook_mapping_body:
402            self.webhook_mapping_body.evaluate(
403                user=notification.user,
404                request=None,
405                notification=notification,
406            )
407        notification.save()
408        layer = get_channel_layer()
409        async_to_sync(layer.group_send)(
410            build_user_group(notification.user),
411            {
412                "type": "event.notification",
413                "id": str(notification.pk),
414                "data": notification.serializer(notification).data,
415            },
416        )
417        return []
418
419    def send_webhook(self, notification: Notification) -> list[str]:
420        """Send notification to generic webhook"""
421        default_body = {
422            "body": notification.body,
423            "severity": notification.severity,
424            "user_email": notification.user.email,
425            "user_username": notification.user.username,
426        }
427        if notification.event and notification.event.user:
428            default_body["event_user_email"] = notification.event.user.get("email", None)
429            default_body["event_user_username"] = notification.event.user.get("username", None)
430        headers = {}
431        if self.webhook_mapping_body:
432            default_body = sanitize_item(
433                self.webhook_mapping_body.evaluate(
434                    user=notification.user,
435                    request=None,
436                    notification=notification,
437                )
438            )
439        if self.webhook_mapping_headers:
440            headers = sanitize_item(
441                self.webhook_mapping_headers.evaluate(
442                    user=notification.user,
443                    request=None,
444                    notification=notification,
445                )
446            )
447
448        def send(**kwargs):
449            try:
450                response = get_http_session().post(
451                    self.webhook_url,
452                    json=default_body,
453                    headers=headers,
454                    **kwargs,
455                )
456                response.raise_for_status()
457            except RequestException as exc:
458                raise NotificationTransportError(
459                    exc.response.text if exc.response else str(exc)
460                ) from exc
461            return [
462                response.status_code,
463                response.text,
464            ]
465
466        if self.webhook_ca:
467            with DockerInlineTLS(self.webhook_ca, authentication_kp=None) as tls:
468                return send(verify=tls.ca_cert)
469        return send()
470
471    def send_webhook_slack(self, notification: Notification) -> list[str]:
472        """Send notification to slack or slack-compatible endpoints"""
473        fields = [
474            {
475                "title": _("Severity"),
476                "value": notification.severity,
477                "short": True,
478            },
479            {
480                "title": _("Dispatched for user"),
481                "value": str(notification.user),
482                "short": True,
483            },
484        ]
485        if notification.event:
486            if notification.event.user:
487                fields.append(
488                    {
489                        "title": _("Event user"),
490                        "value": str(notification.event.user.get("username")),
491                        "short": True,
492                    },
493                )
494            for key, value in notification.event.context.items():
495                if not isinstance(value, str):
496                    continue
497                # https://birdie0.github.io/discord-webhooks-guide/other/field_limits.html
498                if len(fields) >= DISCORD_FIELD_LIMIT:
499                    continue
500                fields.append({"title": key[:256], "value": value[:1024]})
501        body = {
502            "username": "authentik",
503            "icon_url": "https://goauthentik.io/img/icon.png",
504            "attachments": [
505                {
506                    "author_name": "authentik",
507                    "author_link": "https://goauthentik.io",
508                    "author_icon": "https://goauthentik.io/img/icon.png",
509                    "title": notification.body,
510                    "color": "#fd4b2d",
511                    "fields": fields,
512                    "footer": f"authentik {authentik_full_version()}",
513                }
514            ],
515        }
516        if notification.event:
517            body["attachments"][0]["title"] = notification.event.action
518        try:
519            response = get_http_session().post(self.webhook_url, json=body)
520            response.raise_for_status()
521        except RequestException as exc:
522            text = exc.response.text if exc.response else str(exc)
523            raise NotificationTransportError(text) from exc
524        return [
525            response.status_code,
526            response.text,
527        ]
528
529    def send_email(self, notification: Notification) -> list[str]:
530        """Send notification via global email configuration"""
531        from authentik.stages.email.tasks import send_mail
532
533        if notification.user.email.strip() == "":
534            LOGGER.info(
535                "Discarding notification as user has no email address",
536                user=notification.user,
537                notification=notification,
538            )
539            return None
540        context = {
541            "key_value": {
542                "user_email": notification.user.email,
543                "user_username": notification.user.username,
544            },
545            "body": notification.body,
546            "title": "",
547        }
548        if notification.event and notification.event.user:
549            context["key_value"]["event_user_email"] = notification.event.user.get("email", None)
550            context["key_value"]["event_user_username"] = notification.event.user.get(
551                "username", None
552            )
553        if notification.hyperlink:
554            context["link"] = {
555                "target": notification.hyperlink,
556                "label": notification.hyperlink_label,
557            }
558        if notification.event:
559            context["title"] += notification.event.action
560            for key, value in notification.event.context.items():
561                if not isinstance(value, str):
562                    continue
563                context["key_value"][key] = value
564        else:
565            context["title"] += notification.body[:NOTIFICATION_SUMMARY_LENGTH]
566        # TODO: improve permission check
567        if notification.user.is_superuser:
568            context["source"] = {
569                "from": self.name,
570            }
571        mail = TemplateEmailMessage(
572            subject=self.email_subject_prefix + context["title"],
573            to=[(notification.user.name, notification.user.email)],
574            language=notification.user.locale(),
575            template_name=self.email_template,
576            template_context=context,
577        )
578        send_mail.send_with_options(args=(mail.__dict__,), rel_obj=self)
579        return []
580
581    @property
582    def serializer(self) -> type[Serializer]:
583        from authentik.events.api.notification_transports import (
584            NotificationTransportSerializer,
585        )
586
587        return NotificationTransportSerializer
588
589    def __str__(self) -> str:
590        return f"Notification Transport {self.name}"
591
592    class Meta:
593        verbose_name = _("Notification Transport")
594        verbose_name_plural = _("Notification Transports")
595
596
597class NotificationSeverity(models.TextChoices):
598    """Severity images that a notification can have"""
599
600    NOTICE = "notice", _("Notice")
601    WARNING = "warning", _("Warning")
602    ALERT = "alert", _("Alert")
603
604
605class Notification(SerializerModel):
606    """Event Notification"""
607
608    uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
609    severity = models.TextField(choices=NotificationSeverity.choices)
610    body = models.TextField()
611    hyperlink = models.TextField(blank=True, null=True, max_length=4096)
612    hyperlink_label = models.TextField(blank=True, null=True)
613    created = models.DateTimeField(auto_now_add=True)
614    event = models.ForeignKey(Event, on_delete=models.SET_NULL, null=True, blank=True)
615    seen = models.BooleanField(default=False)
616    user = models.ForeignKey(User, on_delete=models.CASCADE)
617
618    @property
619    def serializer(self) -> type[Serializer]:
620        from authentik.events.api.notifications import NotificationSerializer
621
622        return NotificationSerializer
623
624    def __str__(self) -> str:
625        body_trunc = (
626            (self.body[:NOTIFICATION_SUMMARY_LENGTH] + "..")
627            if len(self.body) > NOTIFICATION_SUMMARY_LENGTH
628            else self.body
629        )
630        return f"Notification for user {self.user_id}: {body_trunc}"
631
632    class Meta:
633        verbose_name = _("Notification")
634        verbose_name_plural = _("Notifications")
635
636
637class NotificationRule(TasksModel, SerializerModel, PolicyBindingModel):
638    """Decide when to create a Notification based on policies attached to this object."""
639
640    name = models.TextField(unique=True)
641    transports = models.ManyToManyField(
642        NotificationTransport,
643        help_text=_(
644            "Select which transports should be used to notify the user. If none are "
645            "selected, the notification will only be shown in the authentik UI."
646        ),
647        blank=True,
648    )
649    severity = models.TextField(
650        choices=NotificationSeverity.choices,
651        default=NotificationSeverity.NOTICE,
652        help_text=_("Controls which severity level the created notifications will have."),
653    )
654    destination_group = models.ForeignKey(
655        Group,
656        help_text=_(
657            "Define which group of users this notification should be sent and shown to. "
658            "If left empty, Notification won't ben sent."
659        ),
660        null=True,
661        blank=True,
662        on_delete=models.SET_NULL,
663    )
664    destination_event_user = models.BooleanField(
665        default=False,
666        help_text=_(
667            "When enabled, notification will be sent to user the user that triggered the event."
668            "When destination_group is configured, notification is sent to both."
669        ),
670    )
671
672    def destination_users(self, event: Event) -> Generator[User, Any]:
673        if self.destination_event_user and event.user.get("pk"):
674            yield User(pk=event.user.get("pk"))
675        if self.destination_group:
676            yield from self.destination_group.users.all()
677
678    @property
679    def serializer(self) -> type[Serializer]:
680        from authentik.events.api.notification_rules import NotificationRuleSerializer
681
682        return NotificationRuleSerializer
683
684    def __str__(self) -> str:
685        return f"Notification Rule {self.name}"
686
687    class Meta:
688        verbose_name = _("Notification Rule")
689        verbose_name_plural = _("Notification Rules")
690
691
692class NotificationWebhookMapping(PropertyMapping):
693    """Modify the payload of outgoing webhook requests"""
694
695    @property
696    def component(self) -> str:
697        return "ak-property-mapping-notification-form"
698
699    @property
700    def serializer(self) -> type[type[Serializer]]:
701        from authentik.events.api.notification_mappings import (
702            NotificationWebhookMappingSerializer,
703        )
704
705        return NotificationWebhookMappingSerializer
706
707    def __str__(self):
708        return f"Webhook Mapping {self.name}"
709
710    class Meta:
711        verbose_name = _("Webhook Mapping")
712        verbose_name_plural = _("Webhook Mappings")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
DISCORD_FIELD_LIMIT = 25
NOTIFICATION_SUMMARY_LENGTH = 75
def default_event_duration():
62def default_event_duration():
63    """Default duration an Event is saved.
64    This is used as a fallback when no brand is available"""
65    try:
66        tenant = get_current_tenant(only=["event_retention"])
67        return now() + timedelta_from_string(tenant.event_retention)
68    except Tenant.DoesNotExist:
69        return now() + timedelta(days=365)

Default duration an Event is saved. This is used as a fallback when no brand is available

def default_brand():
72def default_brand():
73    """Get a default value for brand"""
74    return sanitize_dict(model_to_dict(DEFAULT_BRAND))

Get a default value for brand

@lru_cache
def django_app_names() -> list[str]:
77@lru_cache
78def django_app_names() -> list[str]:
79    """Get a cached list of all django apps' names (not labels)"""
80    return [x.name for x in apps.app_configs.values()]

Get a cached list of all django apps' names (not labels)

class NotificationTransportError(authentik.lib.sentry.SentryIgnoredException):
83class NotificationTransportError(SentryIgnoredException):
84    """Error raised when a notification fails to be delivered"""

Error raised when a notification fails to be delivered

class EventAction(django.db.models.enums.TextChoices):
 87class EventAction(models.TextChoices):
 88    """All possible actions to save into the events log"""
 89
 90    LOGIN = "login"
 91    LOGIN_FAILED = "login_failed"
 92    LOGOUT = "logout"
 93
 94    USER_WRITE = "user_write"
 95    SUSPICIOUS_REQUEST = "suspicious_request"
 96    PASSWORD_SET = "password_set"  # noqa # nosec
 97
 98    SECRET_VIEW = "secret_view"  # noqa # nosec
 99    SECRET_ROTATE = "secret_rotate"  # noqa # nosec
100
101    INVITE_USED = "invitation_used"
102
103    AUTHORIZE_APPLICATION = "authorize_application"
104    SOURCE_LINKED = "source_linked"
105
106    IMPERSONATION_STARTED = "impersonation_started"
107    IMPERSONATION_ENDED = "impersonation_ended"
108
109    FLOW_EXECUTION = "flow_execution"
110    POLICY_EXECUTION = "policy_execution"
111    POLICY_EXCEPTION = "policy_exception"
112    PROPERTY_MAPPING_EXCEPTION = "property_mapping_exception"
113
114    SYSTEM_TASK_EXECUTION = "system_task_execution"
115    SYSTEM_TASK_EXCEPTION = "system_task_exception"
116    SYSTEM_EXCEPTION = "system_exception"
117
118    CONFIGURATION_ERROR = "configuration_error"
119    CONFIGURATION_WARNING = "configuration_warning"
120
121    MODEL_CREATED = "model_created"
122    MODEL_UPDATED = "model_updated"
123    MODEL_DELETED = "model_deleted"
124    EMAIL_SENT = "email_sent"
125
126    UPDATE_AVAILABLE = "update_available"
127
128    EXPORT_READY = "export_ready"
129
130    REVIEW_INITIATED = "review_initiated"
131    REVIEW_OVERDUE = "review_overdue"
132    REVIEW_ATTESTED = "review_attested"
133    REVIEW_COMPLETED = "review_completed"
134
135    CUSTOM_PREFIX = "custom_"

All possible actions to save into the events log

LOGIN_FAILED = EventAction.LOGIN_FAILED
USER_WRITE = EventAction.USER_WRITE
SUSPICIOUS_REQUEST = EventAction.SUSPICIOUS_REQUEST
PASSWORD_SET = EventAction.PASSWORD_SET
SECRET_VIEW = EventAction.SECRET_VIEW
SECRET_ROTATE = EventAction.SECRET_ROTATE
INVITE_USED = EventAction.INVITE_USED
AUTHORIZE_APPLICATION = EventAction.AUTHORIZE_APPLICATION
SOURCE_LINKED = EventAction.SOURCE_LINKED
IMPERSONATION_STARTED = EventAction.IMPERSONATION_STARTED
IMPERSONATION_ENDED = EventAction.IMPERSONATION_ENDED
FLOW_EXECUTION = EventAction.FLOW_EXECUTION
POLICY_EXECUTION = EventAction.POLICY_EXECUTION
POLICY_EXCEPTION = EventAction.POLICY_EXCEPTION
PROPERTY_MAPPING_EXCEPTION = EventAction.PROPERTY_MAPPING_EXCEPTION
SYSTEM_TASK_EXECUTION = EventAction.SYSTEM_TASK_EXECUTION
SYSTEM_TASK_EXCEPTION = EventAction.SYSTEM_TASK_EXCEPTION
SYSTEM_EXCEPTION = EventAction.SYSTEM_EXCEPTION
CONFIGURATION_ERROR = EventAction.CONFIGURATION_ERROR
CONFIGURATION_WARNING = EventAction.CONFIGURATION_WARNING
MODEL_CREATED = EventAction.MODEL_CREATED
MODEL_UPDATED = EventAction.MODEL_UPDATED
MODEL_DELETED = EventAction.MODEL_DELETED
EMAIL_SENT = EventAction.EMAIL_SENT
UPDATE_AVAILABLE = EventAction.UPDATE_AVAILABLE
EXPORT_READY = EventAction.EXPORT_READY
REVIEW_INITIATED = EventAction.REVIEW_INITIATED
REVIEW_OVERDUE = EventAction.REVIEW_OVERDUE
REVIEW_ATTESTED = EventAction.REVIEW_ATTESTED
REVIEW_COMPLETED = EventAction.REVIEW_COMPLETED
CUSTOM_PREFIX = EventAction.CUSTOM_PREFIX
138class Event(SerializerModel, ExpiringModel):
139    """An individual Audit/Metrics/Notification/Error Event"""
140
141    event_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
142    user = models.JSONField(default=dict)
143    action = models.TextField(choices=EventAction.choices)
144    app = models.TextField()
145    context = models.JSONField(default=dict, blank=True)
146    client_ip = models.GenericIPAddressField(null=True)
147    created = models.DateTimeField(auto_now_add=True)
148    brand = models.JSONField(default=default_brand, blank=True)
149
150    # Shadow the expires attribute from ExpiringModel to override the default duration
151    expires = models.DateTimeField(default=default_event_duration)
152
153    @staticmethod
154    def _get_app_from_request(request: HttpRequest) -> str:
155        if not isinstance(request, HttpRequest):
156            return ""
157        return request.resolver_match.app_name
158
159    @staticmethod
160    def new(
161        action: str | EventAction,
162        app: str | None = None,
163        **kwargs,
164    ) -> Event:
165        """Create new Event instance from arguments. Instance is NOT saved."""
166        if not isinstance(action, EventAction):
167            action = EventAction.CUSTOM_PREFIX + action
168        if not app:
169            current = currentframe()
170            parent = current.f_back
171            app = parent.f_globals["__name__"]
172            # Attempt to match the calling module to the django app it belongs to
173            # if we can't find a match, keep the module name
174            django_apps: list[str] = get_close_matches(app, django_app_names(), n=1)
175            # Also ensure that closest django app has the correct prefix
176            if len(django_apps) > 0 and django_apps[0].startswith(app):
177                app = django_apps[0]
178        cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
179        event = Event(action=action, app=app, context=cleaned_kwargs)
180        return event
181
182    def with_exception(self, exc: Exception) -> Event:
183        """Add data from 'exc' to the event in a database-saveable format"""
184        self.context.setdefault("message", str(exc))
185        self.context["exception"] = exception_to_dict(exc)
186        return self
187
188    def set_user(self, user: User) -> Event:
189        """Set `.user` based on user, ensuring the correct attributes are copied.
190        This should only be used when self.from_http is *not* used."""
191        self.user = get_user(user)
192        return self
193
194    def from_http(self, request: HttpRequest, user: User | None = None) -> Event:
195        """Add data from a Django-HttpRequest, allowing the creation of
196        Events independently from requests.
197        `user` arguments optionally overrides user from requests."""
198        if request:
199            from authentik.flows.views.executor import QS_QUERY
200
201            self.context["http_request"] = {
202                "path": request.path,
203                "method": request.method,
204                "args": cleanse_dict(QueryDict(request.META.get("QUERY_STRING", ""))),
205                "user_agent": request.META.get("HTTP_USER_AGENT", ""),
206            }
207            if hasattr(request, "request_id"):
208                self.context["http_request"]["request_id"] = request.request_id
209            # Special case for events created during flow execution
210            # since they keep the http query within a wrapped query
211            if QS_QUERY in self.context["http_request"]["args"]:
212                wrapped = self.context["http_request"]["args"][QS_QUERY]
213                self.context["http_request"]["args"] = cleanse_dict(QueryDict(wrapped))
214        if hasattr(request, "brand"):
215            brand: Brand = request.brand
216            self.brand = sanitize_dict(model_to_dict(brand))
217        if hasattr(request, "user"):
218            self.user = get_user(request.user)
219        if user:
220            self.user = get_user(user)
221        if hasattr(request, "session"):
222            from authentik.flows.views.executor import SESSION_KEY_PLAN
223
224            # Check if we're currently impersonating, and add that user
225            if SESSION_KEY_IMPERSONATE_ORIGINAL_USER in request.session:
226                self.user = get_user(request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER])
227                self.user["on_behalf_of"] = get_user(request.session[SESSION_KEY_IMPERSONATE_USER])
228            # Special case for events that happen during a flow, the user might not be authenticated
229            # yet but is a pending user instead
230            if SESSION_KEY_PLAN in request.session:
231                from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
232
233                plan: FlowPlan = request.session[SESSION_KEY_PLAN]
234                pending_user = plan.context.get(PLAN_CONTEXT_PENDING_USER, None)
235                # Only save `authenticated_as` if there's a different pending user in the flow
236                # than the user that is authenticated
237                if pending_user and (
238                    (pending_user.pk and pending_user.pk != self.user.get("pk"))
239                    or (not pending_user.pk)
240                ):
241                    orig_user = self.user.copy()
242
243                    self.user = {"authenticated_as": orig_user, **get_user(pending_user)}
244        # User 255.255.255.255 as fallback if IP cannot be determined
245        self.client_ip = ClientIPMiddleware.get_client_ip(request)
246        # Enrich event data
247        for processor in get_context_processors():
248            processor.enrich_event(self)
249        # If there's no app set, we get it from the requests too
250        if not self.app:
251            self.app = Event._get_app_from_request(request)
252        self.save()
253        return self
254
255    @staticmethod
256    def log_deprecation(
257        identifier: str, message: str, cause: str | None = None, expiry_days=30, **kwargs
258    ):
259        query = Q(
260            action=EventAction.CONFIGURATION_WARNING,
261            context__deprecation=identifier,
262        )
263        if cause:
264            query &= Q(context__cause=cause)
265        if Event.objects.filter(query).exists():
266            return
267        event = Event.new(
268            EventAction.CONFIGURATION_WARNING,
269            deprecation=identifier,
270            message=message,
271            cause=cause,
272            **kwargs,
273        )
274        event.expires = now() + timedelta(days=expiry_days)
275        event.save()
276
277    def save(self, *args, **kwargs):
278        if self._state.adding:
279            LOGGER.info(
280                "Created Event",
281                action=self.action,
282                context=self.context,
283                client_ip=self.client_ip,
284                user=self.user,
285            )
286        super().save(*args, **kwargs)
287
288    @property
289    def serializer(self) -> type[Serializer]:
290        from authentik.events.api.events import EventSerializer
291
292        return EventSerializer
293
294    @property
295    def summary(self) -> str:
296        """Return a summary of this event."""
297        if "message" in self.context:
298            return self.context["message"]
299        return f"{self.action}: {self.context}"
300
301    @property
302    def hyperlink(self) -> str | None:
303        return self.context.get("hyperlink")
304
305    @property
306    def hyperlink_label(self) -> str | None:
307        return self.context.get("hyperlink_label")
308
309    def __str__(self) -> str:
310        return f"Event action={self.action} user={self.user} context={self.context}"
311
312    class Meta:
313        verbose_name = _("Event")
314        verbose_name_plural = _("Events")
315        indexes = ExpiringModel.Meta.indexes + [
316            models.Index(fields=["action"]),
317            models.Index(fields=["user"]),
318            models.Index(fields=["app"]),
319            models.Index(fields=["created"]),
320            models.Index(fields=["client_ip"]),
321            models.Index(
322                models.F("context__authorized_application"),
323                name="authentik_e_ctx_app__idx",
324            ),
325        ]

An individual Audit/Metrics/Notification/Error Event

def event_uuid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def user(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def action(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def app(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def context(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def client_ip(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def created(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def brand(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def expires(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

@staticmethod
def new( action: str | EventAction, app: str | None = None, **kwargs) -> Event:
159    @staticmethod
160    def new(
161        action: str | EventAction,
162        app: str | None = None,
163        **kwargs,
164    ) -> Event:
165        """Create new Event instance from arguments. Instance is NOT saved."""
166        if not isinstance(action, EventAction):
167            action = EventAction.CUSTOM_PREFIX + action
168        if not app:
169            current = currentframe()
170            parent = current.f_back
171            app = parent.f_globals["__name__"]
172            # Attempt to match the calling module to the django app it belongs to
173            # if we can't find a match, keep the module name
174            django_apps: list[str] = get_close_matches(app, django_app_names(), n=1)
175            # Also ensure that closest django app has the correct prefix
176            if len(django_apps) > 0 and django_apps[0].startswith(app):
177                app = django_apps[0]
178        cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
179        event = Event(action=action, app=app, context=cleaned_kwargs)
180        return event

Create new Event instance from arguments. Instance is NOT saved.

def with_exception(self, exc: Exception) -> Event:
182    def with_exception(self, exc: Exception) -> Event:
183        """Add data from 'exc' to the event in a database-saveable format"""
184        self.context.setdefault("message", str(exc))
185        self.context["exception"] = exception_to_dict(exc)
186        return self

Add data from 'exc' to the event in a database-saveable format

def set_user(self, user: authentik.core.models.User) -> Event:
188    def set_user(self, user: User) -> Event:
189        """Set `.user` based on user, ensuring the correct attributes are copied.
190        This should only be used when self.from_http is *not* used."""
191        self.user = get_user(user)
192        return self

Set .user based on user, ensuring the correct attributes are copied. This should only be used when self.from_http is not used.

def from_http( self, request: django.http.request.HttpRequest, user: authentik.core.models.User | None = None) -> Event:
194    def from_http(self, request: HttpRequest, user: User | None = None) -> Event:
195        """Add data from a Django-HttpRequest, allowing the creation of
196        Events independently from requests.
197        `user` arguments optionally overrides user from requests."""
198        if request:
199            from authentik.flows.views.executor import QS_QUERY
200
201            self.context["http_request"] = {
202                "path": request.path,
203                "method": request.method,
204                "args": cleanse_dict(QueryDict(request.META.get("QUERY_STRING", ""))),
205                "user_agent": request.META.get("HTTP_USER_AGENT", ""),
206            }
207            if hasattr(request, "request_id"):
208                self.context["http_request"]["request_id"] = request.request_id
209            # Special case for events created during flow execution
210            # since they keep the http query within a wrapped query
211            if QS_QUERY in self.context["http_request"]["args"]:
212                wrapped = self.context["http_request"]["args"][QS_QUERY]
213                self.context["http_request"]["args"] = cleanse_dict(QueryDict(wrapped))
214        if hasattr(request, "brand"):
215            brand: Brand = request.brand
216            self.brand = sanitize_dict(model_to_dict(brand))
217        if hasattr(request, "user"):
218            self.user = get_user(request.user)
219        if user:
220            self.user = get_user(user)
221        if hasattr(request, "session"):
222            from authentik.flows.views.executor import SESSION_KEY_PLAN
223
224            # Check if we're currently impersonating, and add that user
225            if SESSION_KEY_IMPERSONATE_ORIGINAL_USER in request.session:
226                self.user = get_user(request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER])
227                self.user["on_behalf_of"] = get_user(request.session[SESSION_KEY_IMPERSONATE_USER])
228            # Special case for events that happen during a flow, the user might not be authenticated
229            # yet but is a pending user instead
230            if SESSION_KEY_PLAN in request.session:
231                from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
232
233                plan: FlowPlan = request.session[SESSION_KEY_PLAN]
234                pending_user = plan.context.get(PLAN_CONTEXT_PENDING_USER, None)
235                # Only save `authenticated_as` if there's a different pending user in the flow
236                # than the user that is authenticated
237                if pending_user and (
238                    (pending_user.pk and pending_user.pk != self.user.get("pk"))
239                    or (not pending_user.pk)
240                ):
241                    orig_user = self.user.copy()
242
243                    self.user = {"authenticated_as": orig_user, **get_user(pending_user)}
244        # User 255.255.255.255 as fallback if IP cannot be determined
245        self.client_ip = ClientIPMiddleware.get_client_ip(request)
246        # Enrich event data
247        for processor in get_context_processors():
248            processor.enrich_event(self)
249        # If there's no app set, we get it from the requests too
250        if not self.app:
251            self.app = Event._get_app_from_request(request)
252        self.save()
253        return self

Add data from a Django-HttpRequest, allowing the creation of Events independently from requests. user arguments optionally overrides user from requests.

@staticmethod
def log_deprecation( identifier: str, message: str, cause: str | None = None, expiry_days=30, **kwargs):
255    @staticmethod
256    def log_deprecation(
257        identifier: str, message: str, cause: str | None = None, expiry_days=30, **kwargs
258    ):
259        query = Q(
260            action=EventAction.CONFIGURATION_WARNING,
261            context__deprecation=identifier,
262        )
263        if cause:
264            query &= Q(context__cause=cause)
265        if Event.objects.filter(query).exists():
266            return
267        event = Event.new(
268            EventAction.CONFIGURATION_WARNING,
269            deprecation=identifier,
270            message=message,
271            cause=cause,
272            **kwargs,
273        )
274        event.expires = now() + timedelta(days=expiry_days)
275        event.save()
def save(self, *args, **kwargs):
277    def save(self, *args, **kwargs):
278        if self._state.adding:
279            LOGGER.info(
280                "Created Event",
281                action=self.action,
282                context=self.context,
283                client_ip=self.client_ip,
284                user=self.user,
285            )
286        super().save(*args, **kwargs)

Save the current instance. Override this in a subclass if you want to control the saving process.

The 'force_insert' and 'force_update' parameters can be used to insist that the "save" must be an SQL insert or update (or equivalent for non-SQL backends), respectively. Normally, they should not be set.

serializer: type[rest_framework.serializers.Serializer]
288    @property
289    def serializer(self) -> type[Serializer]:
290        from authentik.events.api.events import EventSerializer
291
292        return EventSerializer

Get serializer for this model

summary: str
294    @property
295    def summary(self) -> str:
296        """Return a summary of this event."""
297        if "message" in self.context:
298            return self.context["message"]
299        return f"{self.action}: {self.context}"

Return a summary of this event.

def expiring(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def get_action_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_expires(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_expires(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

notification_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class Event.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class Event.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class TransportMode(django.db.models.enums.TextChoices):
328class TransportMode(models.TextChoices):
329    """Modes that a notification transport can send a notification"""
330
331    LOCAL = "local", _("authentik inbuilt notifications")
332    WEBHOOK = "webhook", _("Generic Webhook")
333    WEBHOOK_SLACK = "webhook_slack", _("Slack Webhook (Slack/Discord)")
334    EMAIL = "email", _("Email")

Modes that a notification transport can send a notification

WEBHOOK_SLACK = TransportMode.WEBHOOK_SLACK
class NotificationTransport(authentik.tasks.models.TasksModel, authentik.lib.models.SerializerModel):
337class NotificationTransport(TasksModel, SerializerModel):
338    """Action which is executed when a Rule matches"""
339
340    uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
341
342    name = models.TextField(unique=True)
343    mode = models.TextField(choices=TransportMode.choices, default=TransportMode.LOCAL)
344    send_once = models.BooleanField(
345        default=False,
346        help_text=_(
347            "Only send notification once, for example when sending a webhook into a chat channel."
348        ),
349    )
350
351    email_subject_prefix = models.TextField(default="authentik Notification: ", blank=True)
352    email_template = models.TextField(default=EmailTemplates.EVENT_NOTIFICATION)
353
354    webhook_url = models.TextField(blank=True, validators=[DomainlessURLValidator()])
355    webhook_ca = models.ForeignKey(
356        CertificateKeyPair,
357        null=True,
358        default=None,
359        on_delete=models.SET_DEFAULT,
360        help_text=_(
361            "When set, the selected ceritifcate is used to "
362            "validate the certificate of the webhook server."
363        ),
364    )
365    webhook_mapping_body = models.ForeignKey(
366        "NotificationWebhookMapping",
367        on_delete=models.SET_DEFAULT,
368        null=True,
369        default=None,
370        related_name="+",
371        help_text=_(
372            "Customize the body of the request. "
373            "Mapping should return data that is JSON-serializable."
374        ),
375    )
376    webhook_mapping_headers = models.ForeignKey(
377        "NotificationWebhookMapping",
378        on_delete=models.SET_DEFAULT,
379        null=True,
380        default=None,
381        related_name="+",
382        help_text=_(
383            "Configure additional headers to be sent. "
384            "Mapping should return a dictionary of key-value pairs"
385        ),
386    )
387
388    def send(self, notification: Notification) -> list[str]:
389        """Send notification to user, called from async task"""
390        if self.mode == TransportMode.LOCAL:
391            return self.send_local(notification)
392        if self.mode == TransportMode.WEBHOOK:
393            return self.send_webhook(notification)
394        if self.mode == TransportMode.WEBHOOK_SLACK:
395            return self.send_webhook_slack(notification)
396        if self.mode == TransportMode.EMAIL:
397            return self.send_email(notification)
398        raise ValueError(f"Invalid mode {self.mode} set")
399
400    def send_local(self, notification: Notification) -> list[str]:
401        """Local notification delivery"""
402        if self.webhook_mapping_body:
403            self.webhook_mapping_body.evaluate(
404                user=notification.user,
405                request=None,
406                notification=notification,
407            )
408        notification.save()
409        layer = get_channel_layer()
410        async_to_sync(layer.group_send)(
411            build_user_group(notification.user),
412            {
413                "type": "event.notification",
414                "id": str(notification.pk),
415                "data": notification.serializer(notification).data,
416            },
417        )
418        return []
419
420    def send_webhook(self, notification: Notification) -> list[str]:
421        """Send notification to generic webhook"""
422        default_body = {
423            "body": notification.body,
424            "severity": notification.severity,
425            "user_email": notification.user.email,
426            "user_username": notification.user.username,
427        }
428        if notification.event and notification.event.user:
429            default_body["event_user_email"] = notification.event.user.get("email", None)
430            default_body["event_user_username"] = notification.event.user.get("username", None)
431        headers = {}
432        if self.webhook_mapping_body:
433            default_body = sanitize_item(
434                self.webhook_mapping_body.evaluate(
435                    user=notification.user,
436                    request=None,
437                    notification=notification,
438                )
439            )
440        if self.webhook_mapping_headers:
441            headers = sanitize_item(
442                self.webhook_mapping_headers.evaluate(
443                    user=notification.user,
444                    request=None,
445                    notification=notification,
446                )
447            )
448
449        def send(**kwargs):
450            try:
451                response = get_http_session().post(
452                    self.webhook_url,
453                    json=default_body,
454                    headers=headers,
455                    **kwargs,
456                )
457                response.raise_for_status()
458            except RequestException as exc:
459                raise NotificationTransportError(
460                    exc.response.text if exc.response else str(exc)
461                ) from exc
462            return [
463                response.status_code,
464                response.text,
465            ]
466
467        if self.webhook_ca:
468            with DockerInlineTLS(self.webhook_ca, authentication_kp=None) as tls:
469                return send(verify=tls.ca_cert)
470        return send()
471
472    def send_webhook_slack(self, notification: Notification) -> list[str]:
473        """Send notification to slack or slack-compatible endpoints"""
474        fields = [
475            {
476                "title": _("Severity"),
477                "value": notification.severity,
478                "short": True,
479            },
480            {
481                "title": _("Dispatched for user"),
482                "value": str(notification.user),
483                "short": True,
484            },
485        ]
486        if notification.event:
487            if notification.event.user:
488                fields.append(
489                    {
490                        "title": _("Event user"),
491                        "value": str(notification.event.user.get("username")),
492                        "short": True,
493                    },
494                )
495            for key, value in notification.event.context.items():
496                if not isinstance(value, str):
497                    continue
498                # https://birdie0.github.io/discord-webhooks-guide/other/field_limits.html
499                if len(fields) >= DISCORD_FIELD_LIMIT:
500                    continue
501                fields.append({"title": key[:256], "value": value[:1024]})
502        body = {
503            "username": "authentik",
504            "icon_url": "https://goauthentik.io/img/icon.png",
505            "attachments": [
506                {
507                    "author_name": "authentik",
508                    "author_link": "https://goauthentik.io",
509                    "author_icon": "https://goauthentik.io/img/icon.png",
510                    "title": notification.body,
511                    "color": "#fd4b2d",
512                    "fields": fields,
513                    "footer": f"authentik {authentik_full_version()}",
514                }
515            ],
516        }
517        if notification.event:
518            body["attachments"][0]["title"] = notification.event.action
519        try:
520            response = get_http_session().post(self.webhook_url, json=body)
521            response.raise_for_status()
522        except RequestException as exc:
523            text = exc.response.text if exc.response else str(exc)
524            raise NotificationTransportError(text) from exc
525        return [
526            response.status_code,
527            response.text,
528        ]
529
530    def send_email(self, notification: Notification) -> list[str]:
531        """Send notification via global email configuration"""
532        from authentik.stages.email.tasks import send_mail
533
534        if notification.user.email.strip() == "":
535            LOGGER.info(
536                "Discarding notification as user has no email address",
537                user=notification.user,
538                notification=notification,
539            )
540            return None
541        context = {
542            "key_value": {
543                "user_email": notification.user.email,
544                "user_username": notification.user.username,
545            },
546            "body": notification.body,
547            "title": "",
548        }
549        if notification.event and notification.event.user:
550            context["key_value"]["event_user_email"] = notification.event.user.get("email", None)
551            context["key_value"]["event_user_username"] = notification.event.user.get(
552                "username", None
553            )
554        if notification.hyperlink:
555            context["link"] = {
556                "target": notification.hyperlink,
557                "label": notification.hyperlink_label,
558            }
559        if notification.event:
560            context["title"] += notification.event.action
561            for key, value in notification.event.context.items():
562                if not isinstance(value, str):
563                    continue
564                context["key_value"][key] = value
565        else:
566            context["title"] += notification.body[:NOTIFICATION_SUMMARY_LENGTH]
567        # TODO: improve permission check
568        if notification.user.is_superuser:
569            context["source"] = {
570                "from": self.name,
571            }
572        mail = TemplateEmailMessage(
573            subject=self.email_subject_prefix + context["title"],
574            to=[(notification.user.name, notification.user.email)],
575            language=notification.user.locale(),
576            template_name=self.email_template,
577            template_context=context,
578        )
579        send_mail.send_with_options(args=(mail.__dict__,), rel_obj=self)
580        return []
581
582    @property
583    def serializer(self) -> type[Serializer]:
584        from authentik.events.api.notification_transports import (
585            NotificationTransportSerializer,
586        )
587
588        return NotificationTransportSerializer
589
590    def __str__(self) -> str:
591        return f"Notification Transport {self.name}"
592
593    class Meta:
594        verbose_name = _("Notification Transport")
595        verbose_name_plural = _("Notification Transports")

Action which is executed when a Rule matches

def uuid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def mode(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def send_once(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def email_subject_prefix(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def email_template(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def webhook_url(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

webhook_ca

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

webhook_mapping_body

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

webhook_mapping_headers

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def send(self, notification: Notification) -> list[str]:
388    def send(self, notification: Notification) -> list[str]:
389        """Send notification to user, called from async task"""
390        if self.mode == TransportMode.LOCAL:
391            return self.send_local(notification)
392        if self.mode == TransportMode.WEBHOOK:
393            return self.send_webhook(notification)
394        if self.mode == TransportMode.WEBHOOK_SLACK:
395            return self.send_webhook_slack(notification)
396        if self.mode == TransportMode.EMAIL:
397            return self.send_email(notification)
398        raise ValueError(f"Invalid mode {self.mode} set")

Send notification to user, called from async task

def send_local(self, notification: Notification) -> list[str]:
400    def send_local(self, notification: Notification) -> list[str]:
401        """Local notification delivery"""
402        if self.webhook_mapping_body:
403            self.webhook_mapping_body.evaluate(
404                user=notification.user,
405                request=None,
406                notification=notification,
407            )
408        notification.save()
409        layer = get_channel_layer()
410        async_to_sync(layer.group_send)(
411            build_user_group(notification.user),
412            {
413                "type": "event.notification",
414                "id": str(notification.pk),
415                "data": notification.serializer(notification).data,
416            },
417        )
418        return []

Local notification delivery

def send_webhook(self, notification: Notification) -> list[str]:
420    def send_webhook(self, notification: Notification) -> list[str]:
421        """Send notification to generic webhook"""
422        default_body = {
423            "body": notification.body,
424            "severity": notification.severity,
425            "user_email": notification.user.email,
426            "user_username": notification.user.username,
427        }
428        if notification.event and notification.event.user:
429            default_body["event_user_email"] = notification.event.user.get("email", None)
430            default_body["event_user_username"] = notification.event.user.get("username", None)
431        headers = {}
432        if self.webhook_mapping_body:
433            default_body = sanitize_item(
434                self.webhook_mapping_body.evaluate(
435                    user=notification.user,
436                    request=None,
437                    notification=notification,
438                )
439            )
440        if self.webhook_mapping_headers:
441            headers = sanitize_item(
442                self.webhook_mapping_headers.evaluate(
443                    user=notification.user,
444                    request=None,
445                    notification=notification,
446                )
447            )
448
449        def send(**kwargs):
450            try:
451                response = get_http_session().post(
452                    self.webhook_url,
453                    json=default_body,
454                    headers=headers,
455                    **kwargs,
456                )
457                response.raise_for_status()
458            except RequestException as exc:
459                raise NotificationTransportError(
460                    exc.response.text if exc.response else str(exc)
461                ) from exc
462            return [
463                response.status_code,
464                response.text,
465            ]
466
467        if self.webhook_ca:
468            with DockerInlineTLS(self.webhook_ca, authentication_kp=None) as tls:
469                return send(verify=tls.ca_cert)
470        return send()

Send notification to generic webhook

def send_webhook_slack(self, notification: Notification) -> list[str]:
472    def send_webhook_slack(self, notification: Notification) -> list[str]:
473        """Send notification to slack or slack-compatible endpoints"""
474        fields = [
475            {
476                "title": _("Severity"),
477                "value": notification.severity,
478                "short": True,
479            },
480            {
481                "title": _("Dispatched for user"),
482                "value": str(notification.user),
483                "short": True,
484            },
485        ]
486        if notification.event:
487            if notification.event.user:
488                fields.append(
489                    {
490                        "title": _("Event user"),
491                        "value": str(notification.event.user.get("username")),
492                        "short": True,
493                    },
494                )
495            for key, value in notification.event.context.items():
496                if not isinstance(value, str):
497                    continue
498                # https://birdie0.github.io/discord-webhooks-guide/other/field_limits.html
499                if len(fields) >= DISCORD_FIELD_LIMIT:
500                    continue
501                fields.append({"title": key[:256], "value": value[:1024]})
502        body = {
503            "username": "authentik",
504            "icon_url": "https://goauthentik.io/img/icon.png",
505            "attachments": [
506                {
507                    "author_name": "authentik",
508                    "author_link": "https://goauthentik.io",
509                    "author_icon": "https://goauthentik.io/img/icon.png",
510                    "title": notification.body,
511                    "color": "#fd4b2d",
512                    "fields": fields,
513                    "footer": f"authentik {authentik_full_version()}",
514                }
515            ],
516        }
517        if notification.event:
518            body["attachments"][0]["title"] = notification.event.action
519        try:
520            response = get_http_session().post(self.webhook_url, json=body)
521            response.raise_for_status()
522        except RequestException as exc:
523            text = exc.response.text if exc.response else str(exc)
524            raise NotificationTransportError(text) from exc
525        return [
526            response.status_code,
527            response.text,
528        ]

Send notification to slack or slack-compatible endpoints

def send_email(self, notification: Notification) -> list[str]:
530    def send_email(self, notification: Notification) -> list[str]:
531        """Send notification via global email configuration"""
532        from authentik.stages.email.tasks import send_mail
533
534        if notification.user.email.strip() == "":
535            LOGGER.info(
536                "Discarding notification as user has no email address",
537                user=notification.user,
538                notification=notification,
539            )
540            return None
541        context = {
542            "key_value": {
543                "user_email": notification.user.email,
544                "user_username": notification.user.username,
545            },
546            "body": notification.body,
547            "title": "",
548        }
549        if notification.event and notification.event.user:
550            context["key_value"]["event_user_email"] = notification.event.user.get("email", None)
551            context["key_value"]["event_user_username"] = notification.event.user.get(
552                "username", None
553            )
554        if notification.hyperlink:
555            context["link"] = {
556                "target": notification.hyperlink,
557                "label": notification.hyperlink_label,
558            }
559        if notification.event:
560            context["title"] += notification.event.action
561            for key, value in notification.event.context.items():
562                if not isinstance(value, str):
563                    continue
564                context["key_value"][key] = value
565        else:
566            context["title"] += notification.body[:NOTIFICATION_SUMMARY_LENGTH]
567        # TODO: improve permission check
568        if notification.user.is_superuser:
569            context["source"] = {
570                "from": self.name,
571            }
572        mail = TemplateEmailMessage(
573            subject=self.email_subject_prefix + context["title"],
574            to=[(notification.user.name, notification.user.email)],
575            language=notification.user.locale(),
576            template_name=self.email_template,
577            template_context=context,
578        )
579        send_mail.send_with_options(args=(mail.__dict__,), rel_obj=self)
580        return []

Send notification via global email configuration

serializer: type[rest_framework.serializers.Serializer]
582    @property
583    def serializer(self) -> type[Serializer]:
584        from authentik.events.api.notification_transports import (
585            NotificationTransportSerializer,
586        )
587
588        return NotificationTransportSerializer

Get serializer for this model

tasks

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

def get_mode_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

webhook_ca_id
webhook_mapping_body_id
webhook_mapping_headers_id
def objects(unknown):

The type of the None singleton.

notificationrule_set

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

lifecyclerule_set

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class NotificationTransport.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class NotificationTransport.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class NotificationSeverity(django.db.models.enums.TextChoices):
598class NotificationSeverity(models.TextChoices):
599    """Severity images that a notification can have"""
600
601    NOTICE = "notice", _("Notice")
602    WARNING = "warning", _("Warning")
603    ALERT = "alert", _("Alert")

Severity images that a notification can have

class Notification(authentik.lib.models.SerializerModel):
606class Notification(SerializerModel):
607    """Event Notification"""
608
609    uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
610    severity = models.TextField(choices=NotificationSeverity.choices)
611    body = models.TextField()
612    hyperlink = models.TextField(blank=True, null=True, max_length=4096)
613    hyperlink_label = models.TextField(blank=True, null=True)
614    created = models.DateTimeField(auto_now_add=True)
615    event = models.ForeignKey(Event, on_delete=models.SET_NULL, null=True, blank=True)
616    seen = models.BooleanField(default=False)
617    user = models.ForeignKey(User, on_delete=models.CASCADE)
618
619    @property
620    def serializer(self) -> type[Serializer]:
621        from authentik.events.api.notifications import NotificationSerializer
622
623        return NotificationSerializer
624
625    def __str__(self) -> str:
626        body_trunc = (
627            (self.body[:NOTIFICATION_SUMMARY_LENGTH] + "..")
628            if len(self.body) > NOTIFICATION_SUMMARY_LENGTH
629            else self.body
630        )
631        return f"Notification for user {self.user_id}: {body_trunc}"
632
633    class Meta:
634        verbose_name = _("Notification")
635        verbose_name_plural = _("Notifications")

Event Notification

def uuid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def severity(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def body(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def created(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

event

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def seen(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

serializer: type[rest_framework.serializers.Serializer]
619    @property
620    def serializer(self) -> type[Serializer]:
621        from authentik.events.api.notifications import NotificationSerializer
622
623        return NotificationSerializer

Get serializer for this model

def get_severity_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

event_id
user_id
def objects(unknown):

The type of the None singleton.

class Notification.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class Notification.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

638class NotificationRule(TasksModel, SerializerModel, PolicyBindingModel):
639    """Decide when to create a Notification based on policies attached to this object."""
640
641    name = models.TextField(unique=True)
642    transports = models.ManyToManyField(
643        NotificationTransport,
644        help_text=_(
645            "Select which transports should be used to notify the user. If none are "
646            "selected, the notification will only be shown in the authentik UI."
647        ),
648        blank=True,
649    )
650    severity = models.TextField(
651        choices=NotificationSeverity.choices,
652        default=NotificationSeverity.NOTICE,
653        help_text=_("Controls which severity level the created notifications will have."),
654    )
655    destination_group = models.ForeignKey(
656        Group,
657        help_text=_(
658            "Define which group of users this notification should be sent and shown to. "
659            "If left empty, Notification won't ben sent."
660        ),
661        null=True,
662        blank=True,
663        on_delete=models.SET_NULL,
664    )
665    destination_event_user = models.BooleanField(
666        default=False,
667        help_text=_(
668            "When enabled, notification will be sent to user the user that triggered the event."
669            "When destination_group is configured, notification is sent to both."
670        ),
671    )
672
673    def destination_users(self, event: Event) -> Generator[User, Any]:
674        if self.destination_event_user and event.user.get("pk"):
675            yield User(pk=event.user.get("pk"))
676        if self.destination_group:
677            yield from self.destination_group.users.all()
678
679    @property
680    def serializer(self) -> type[Serializer]:
681        from authentik.events.api.notification_rules import NotificationRuleSerializer
682
683        return NotificationRuleSerializer
684
685    def __str__(self) -> str:
686        return f"Notification Rule {self.name}"
687
688    class Meta:
689        verbose_name = _("Notification Rule")
690        verbose_name_plural = _("Notification Rules")

Decide when to create a Notification based on policies attached to this object.

def name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

transports

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

def severity(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

destination_group

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def destination_event_user(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def destination_users( self, event: Event) -> Generator[authentik.core.models.User, typing.Any]:
673    def destination_users(self, event: Event) -> Generator[User, Any]:
674        if self.destination_event_user and event.user.get("pk"):
675            yield User(pk=event.user.get("pk"))
676        if self.destination_group:
677            yield from self.destination_group.users.all()
serializer: type[rest_framework.serializers.Serializer]
679    @property
680    def serializer(self) -> type[Serializer]:
681        from authentik.events.api.notification_rules import NotificationRuleSerializer
682
683        return NotificationRuleSerializer

Get serializer for this model

tasks

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

def get_severity_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

destination_group_id
policybindingmodel_ptr_id
policybindingmodel_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class NotificationRule.DoesNotExist(authentik.policies.models.PolicyBindingModel.DoesNotExist):

The requested object does not exist

class NotificationRule.MultipleObjectsReturned(authentik.policies.models.PolicyBindingModel.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class NotificationWebhookMapping(authentik.core.models.PropertyMapping):
693class NotificationWebhookMapping(PropertyMapping):
694    """Modify the payload of outgoing webhook requests"""
695
696    @property
697    def component(self) -> str:
698        return "ak-property-mapping-notification-form"
699
700    @property
701    def serializer(self) -> type[type[Serializer]]:
702        from authentik.events.api.notification_mappings import (
703            NotificationWebhookMappingSerializer,
704        )
705
706        return NotificationWebhookMappingSerializer
707
708    def __str__(self):
709        return f"Webhook Mapping {self.name}"
710
711    class Meta:
712        verbose_name = _("Webhook Mapping")
713        verbose_name_plural = _("Webhook Mappings")

Modify the payload of outgoing webhook requests

component: str
696    @property
697    def component(self) -> str:
698        return "ak-property-mapping-notification-form"

Return component used to edit this object

serializer: type[type[rest_framework.serializers.Serializer]]
700    @property
701    def serializer(self) -> type[type[Serializer]]:
702        from authentik.events.api.notification_mappings import (
703            NotificationWebhookMappingSerializer,
704        )
705
706        return NotificationWebhookMappingSerializer

Get serializer for this model

propertymapping_ptr_id
propertymapping_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

authenticatorsmsstage_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class NotificationWebhookMapping.DoesNotExist(authentik.core.models.PropertyMapping.DoesNotExist):

The requested object does not exist

class NotificationWebhookMapping.MultipleObjectsReturned(authentik.core.models.PropertyMapping.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.